Upstream version 10.39.225.0
[platform/framework/web/crosswalk.git] / src / net / spdy / spdy_session.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/spdy_session.h"
6
7 #include <algorithm>
8 #include <map>
9
10 #include "base/basictypes.h"
11 #include "base/bind.h"
12 #include "base/compiler_specific.h"
13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/metrics/field_trial.h"
16 #include "base/metrics/histogram.h"
17 #include "base/metrics/sparse_histogram.h"
18 #include "base/metrics/stats_counters.h"
19 #include "base/stl_util.h"
20 #include "base/strings/string_number_conversions.h"
21 #include "base/strings/string_util.h"
22 #include "base/strings/stringprintf.h"
23 #include "base/strings/utf_string_conversions.h"
24 #include "base/time/time.h"
25 #include "base/values.h"
26 #include "crypto/ec_private_key.h"
27 #include "crypto/ec_signature_creator.h"
28 #include "net/base/connection_type_histograms.h"
29 #include "net/base/net_log.h"
30 #include "net/base/net_util.h"
31 #include "net/cert/asn1_util.h"
32 #include "net/cert/cert_verify_result.h"
33 #include "net/http/http_log_util.h"
34 #include "net/http/http_network_session.h"
35 #include "net/http/http_server_properties.h"
36 #include "net/http/http_util.h"
37 #include "net/http/transport_security_state.h"
38 #include "net/socket/ssl_client_socket.h"
39 #include "net/spdy/spdy_buffer_producer.h"
40 #include "net/spdy/spdy_frame_builder.h"
41 #include "net/spdy/spdy_http_utils.h"
42 #include "net/spdy/spdy_protocol.h"
43 #include "net/spdy/spdy_session_pool.h"
44 #include "net/spdy/spdy_stream.h"
45 #include "net/ssl/channel_id_service.h"
46 #include "net/ssl/ssl_cipher_suite_names.h"
47 #include "net/ssl/ssl_connection_status_flags.h"
48
49 namespace net {
50
51 namespace {
52
53 const int kReadBufferSize = 8 * 1024;
54 const int kDefaultConnectionAtRiskOfLossSeconds = 10;
55 const int kHungIntervalSeconds = 10;
56
57 // Minimum seconds that unclaimed pushed streams will be kept in memory.
58 const int kMinPushedStreamLifetimeSeconds = 300;
59
60 scoped_ptr<base::ListValue> SpdyHeaderBlockToListValue(
61     const SpdyHeaderBlock& headers,
62     net::NetLog::LogLevel log_level) {
63   scoped_ptr<base::ListValue> headers_list(new base::ListValue());
64   for (SpdyHeaderBlock::const_iterator it = headers.begin();
65        it != headers.end(); ++it) {
66     headers_list->AppendString(
67         it->first + ": " +
68         ElideHeaderValueForNetLog(log_level, it->first, it->second));
69   }
70   return headers_list.Pass();
71 }
72
73 base::Value* NetLogSpdySynStreamSentCallback(const SpdyHeaderBlock* headers,
74                                              bool fin,
75                                              bool unidirectional,
76                                              SpdyPriority spdy_priority,
77                                              SpdyStreamId stream_id,
78                                              NetLog::LogLevel log_level) {
79   base::DictionaryValue* dict = new base::DictionaryValue();
80   dict->Set("headers",
81             SpdyHeaderBlockToListValue(*headers, log_level).release());
82   dict->SetBoolean("fin", fin);
83   dict->SetBoolean("unidirectional", unidirectional);
84   dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
85   dict->SetInteger("stream_id", stream_id);
86   return dict;
87 }
88
89 base::Value* NetLogSpdySynStreamReceivedCallback(
90     const SpdyHeaderBlock* headers,
91     bool fin,
92     bool unidirectional,
93     SpdyPriority spdy_priority,
94     SpdyStreamId stream_id,
95     SpdyStreamId associated_stream,
96     NetLog::LogLevel log_level) {
97   base::DictionaryValue* dict = new base::DictionaryValue();
98   dict->Set("headers",
99             SpdyHeaderBlockToListValue(*headers, log_level).release());
100   dict->SetBoolean("fin", fin);
101   dict->SetBoolean("unidirectional", unidirectional);
102   dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
103   dict->SetInteger("stream_id", stream_id);
104   dict->SetInteger("associated_stream", associated_stream);
105   return dict;
106 }
107
108 base::Value* NetLogSpdySynReplyOrHeadersReceivedCallback(
109     const SpdyHeaderBlock* headers,
110     bool fin,
111     SpdyStreamId stream_id,
112     NetLog::LogLevel log_level) {
113   base::DictionaryValue* dict = new base::DictionaryValue();
114   dict->Set("headers",
115             SpdyHeaderBlockToListValue(*headers, log_level).release());
116   dict->SetBoolean("fin", fin);
117   dict->SetInteger("stream_id", stream_id);
118   return dict;
119 }
120
121 base::Value* NetLogSpdySessionCloseCallback(int net_error,
122                                             const std::string* description,
123                                             NetLog::LogLevel /* log_level */) {
124   base::DictionaryValue* dict = new base::DictionaryValue();
125   dict->SetInteger("net_error", net_error);
126   dict->SetString("description", *description);
127   return dict;
128 }
129
130 base::Value* NetLogSpdySessionCallback(const HostPortProxyPair* host_pair,
131                                        NetLog::LogLevel /* log_level */) {
132   base::DictionaryValue* dict = new base::DictionaryValue();
133   dict->SetString("host", host_pair->first.ToString());
134   dict->SetString("proxy", host_pair->second.ToPacString());
135   return dict;
136 }
137
138 base::Value* NetLogSpdyInitializedCallback(NetLog::Source source,
139                                            const NextProto protocol_version,
140                                            NetLog::LogLevel /* log_level */) {
141   base::DictionaryValue* dict = new base::DictionaryValue();
142   if (source.IsValid()) {
143     source.AddToEventParameters(dict);
144   }
145   dict->SetString("protocol",
146                   SSLClientSocket::NextProtoToString(protocol_version));
147   return dict;
148 }
149
150 base::Value* NetLogSpdySettingsCallback(const HostPortPair& host_port_pair,
151                                         bool clear_persisted,
152                                         NetLog::LogLevel /* log_level */) {
153   base::DictionaryValue* dict = new base::DictionaryValue();
154   dict->SetString("host", host_port_pair.ToString());
155   dict->SetBoolean("clear_persisted", clear_persisted);
156   return dict;
157 }
158
159 base::Value* NetLogSpdySettingCallback(SpdySettingsIds id,
160                                        const SpdyMajorVersion protocol_version,
161                                        SpdySettingsFlags flags,
162                                        uint32 value,
163                                        NetLog::LogLevel /* log_level */) {
164   base::DictionaryValue* dict = new base::DictionaryValue();
165   dict->SetInteger("id",
166                    SpdyConstants::SerializeSettingId(protocol_version, id));
167   dict->SetInteger("flags", flags);
168   dict->SetInteger("value", value);
169   return dict;
170 }
171
172 base::Value* NetLogSpdySendSettingsCallback(
173     const SettingsMap* settings,
174     const SpdyMajorVersion protocol_version,
175     NetLog::LogLevel /* log_level */) {
176   base::DictionaryValue* dict = new base::DictionaryValue();
177   base::ListValue* settings_list = new base::ListValue();
178   for (SettingsMap::const_iterator it = settings->begin();
179        it != settings->end(); ++it) {
180     const SpdySettingsIds id = it->first;
181     const SpdySettingsFlags flags = it->second.first;
182     const uint32 value = it->second.second;
183     settings_list->Append(new base::StringValue(base::StringPrintf(
184         "[id:%u flags:%u value:%u]",
185         SpdyConstants::SerializeSettingId(protocol_version, id),
186         flags,
187         value)));
188   }
189   dict->Set("settings", settings_list);
190   return dict;
191 }
192
193 base::Value* NetLogSpdyWindowUpdateFrameCallback(
194     SpdyStreamId stream_id,
195     uint32 delta,
196     NetLog::LogLevel /* log_level */) {
197   base::DictionaryValue* dict = new base::DictionaryValue();
198   dict->SetInteger("stream_id", static_cast<int>(stream_id));
199   dict->SetInteger("delta", delta);
200   return dict;
201 }
202
203 base::Value* NetLogSpdySessionWindowUpdateCallback(
204     int32 delta,
205     int32 window_size,
206     NetLog::LogLevel /* log_level */) {
207   base::DictionaryValue* dict = new base::DictionaryValue();
208   dict->SetInteger("delta", delta);
209   dict->SetInteger("window_size", window_size);
210   return dict;
211 }
212
213 base::Value* NetLogSpdyDataCallback(SpdyStreamId stream_id,
214                                     int size,
215                                     bool fin,
216                                     NetLog::LogLevel /* log_level */) {
217   base::DictionaryValue* dict = new base::DictionaryValue();
218   dict->SetInteger("stream_id", static_cast<int>(stream_id));
219   dict->SetInteger("size", size);
220   dict->SetBoolean("fin", fin);
221   return dict;
222 }
223
224 base::Value* NetLogSpdyRstCallback(SpdyStreamId stream_id,
225                                    int status,
226                                    const std::string* description,
227                                    NetLog::LogLevel /* log_level */) {
228   base::DictionaryValue* dict = new base::DictionaryValue();
229   dict->SetInteger("stream_id", static_cast<int>(stream_id));
230   dict->SetInteger("status", status);
231   dict->SetString("description", *description);
232   return dict;
233 }
234
235 base::Value* NetLogSpdyPingCallback(SpdyPingId unique_id,
236                                     bool is_ack,
237                                     const char* type,
238                                     NetLog::LogLevel /* log_level */) {
239   base::DictionaryValue* dict = new base::DictionaryValue();
240   dict->SetInteger("unique_id", unique_id);
241   dict->SetString("type", type);
242   dict->SetBoolean("is_ack", is_ack);
243   return dict;
244 }
245
246 base::Value* NetLogSpdyGoAwayCallback(SpdyStreamId last_stream_id,
247                                       int active_streams,
248                                       int unclaimed_streams,
249                                       SpdyGoAwayStatus status,
250                                       NetLog::LogLevel /* log_level */) {
251   base::DictionaryValue* dict = new base::DictionaryValue();
252   dict->SetInteger("last_accepted_stream_id",
253                    static_cast<int>(last_stream_id));
254   dict->SetInteger("active_streams", active_streams);
255   dict->SetInteger("unclaimed_streams", unclaimed_streams);
256   dict->SetInteger("status", static_cast<int>(status));
257   return dict;
258 }
259
260 base::Value* NetLogSpdyPushPromiseReceivedCallback(
261     const SpdyHeaderBlock* headers,
262     SpdyStreamId stream_id,
263     SpdyStreamId promised_stream_id,
264     NetLog::LogLevel log_level) {
265   base::DictionaryValue* dict = new base::DictionaryValue();
266   dict->Set("headers",
267             SpdyHeaderBlockToListValue(*headers, log_level).release());
268   dict->SetInteger("id", stream_id);
269   dict->SetInteger("promised_stream_id", promised_stream_id);
270   return dict;
271 }
272
273 // Helper function to return the total size of an array of objects
274 // with .size() member functions.
275 template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) {
276   size_t total_size = 0;
277   for (size_t i = 0; i < N; ++i) {
278     total_size += arr[i].size();
279   }
280   return total_size;
281 }
282
283 // Helper class for std:find_if on STL container containing
284 // SpdyStreamRequest weak pointers.
285 class RequestEquals {
286  public:
287   RequestEquals(const base::WeakPtr<SpdyStreamRequest>& request)
288       : request_(request) {}
289
290   bool operator()(const base::WeakPtr<SpdyStreamRequest>& request) const {
291     return request_.get() == request.get();
292   }
293
294  private:
295   const base::WeakPtr<SpdyStreamRequest> request_;
296 };
297
298 // The maximum number of concurrent streams we will ever create.  Even if
299 // the server permits more, we will never exceed this limit.
300 const size_t kMaxConcurrentStreamLimit = 256;
301
302 }  // namespace
303
304 SpdyProtocolErrorDetails MapFramerErrorToProtocolError(
305     SpdyFramer::SpdyError err) {
306   switch(err) {
307     case SpdyFramer::SPDY_NO_ERROR:
308       return SPDY_ERROR_NO_ERROR;
309     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME:
310       return SPDY_ERROR_INVALID_CONTROL_FRAME;
311     case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
312       return SPDY_ERROR_CONTROL_PAYLOAD_TOO_LARGE;
313     case SpdyFramer::SPDY_ZLIB_INIT_FAILURE:
314       return SPDY_ERROR_ZLIB_INIT_FAILURE;
315     case SpdyFramer::SPDY_UNSUPPORTED_VERSION:
316       return SPDY_ERROR_UNSUPPORTED_VERSION;
317     case SpdyFramer::SPDY_DECOMPRESS_FAILURE:
318       return SPDY_ERROR_DECOMPRESS_FAILURE;
319     case SpdyFramer::SPDY_COMPRESS_FAILURE:
320       return SPDY_ERROR_COMPRESS_FAILURE;
321     case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT:
322       return SPDY_ERROR_GOAWAY_FRAME_CORRUPT;
323     case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT:
324       return SPDY_ERROR_RST_STREAM_FRAME_CORRUPT;
325     case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS:
326       return SPDY_ERROR_INVALID_DATA_FRAME_FLAGS;
327     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS:
328       return SPDY_ERROR_INVALID_CONTROL_FRAME_FLAGS;
329     case SpdyFramer::SPDY_UNEXPECTED_FRAME:
330       return SPDY_ERROR_UNEXPECTED_FRAME;
331     default:
332       NOTREACHED();
333       return static_cast<SpdyProtocolErrorDetails>(-1);
334   }
335 }
336
337 Error MapFramerErrorToNetError(SpdyFramer::SpdyError err) {
338   switch (err) {
339     case SpdyFramer::SPDY_NO_ERROR:
340       return OK;
341     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME:
342       return ERR_SPDY_PROTOCOL_ERROR;
343     case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
344       return ERR_SPDY_FRAME_SIZE_ERROR;
345     case SpdyFramer::SPDY_ZLIB_INIT_FAILURE:
346       return ERR_SPDY_COMPRESSION_ERROR;
347     case SpdyFramer::SPDY_UNSUPPORTED_VERSION:
348       return ERR_SPDY_PROTOCOL_ERROR;
349     case SpdyFramer::SPDY_DECOMPRESS_FAILURE:
350       return ERR_SPDY_COMPRESSION_ERROR;
351     case SpdyFramer::SPDY_COMPRESS_FAILURE:
352       return ERR_SPDY_COMPRESSION_ERROR;
353     case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT:
354       return ERR_SPDY_PROTOCOL_ERROR;
355     case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT:
356       return ERR_SPDY_PROTOCOL_ERROR;
357     case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS:
358       return ERR_SPDY_PROTOCOL_ERROR;
359     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS:
360       return ERR_SPDY_PROTOCOL_ERROR;
361     case SpdyFramer::SPDY_UNEXPECTED_FRAME:
362       return ERR_SPDY_PROTOCOL_ERROR;
363     default:
364       NOTREACHED();
365       return ERR_SPDY_PROTOCOL_ERROR;
366   }
367 }
368
369 SpdyProtocolErrorDetails MapRstStreamStatusToProtocolError(
370     SpdyRstStreamStatus status) {
371   switch(status) {
372     case RST_STREAM_PROTOCOL_ERROR:
373       return STATUS_CODE_PROTOCOL_ERROR;
374     case RST_STREAM_INVALID_STREAM:
375       return STATUS_CODE_INVALID_STREAM;
376     case RST_STREAM_REFUSED_STREAM:
377       return STATUS_CODE_REFUSED_STREAM;
378     case RST_STREAM_UNSUPPORTED_VERSION:
379       return STATUS_CODE_UNSUPPORTED_VERSION;
380     case RST_STREAM_CANCEL:
381       return STATUS_CODE_CANCEL;
382     case RST_STREAM_INTERNAL_ERROR:
383       return STATUS_CODE_INTERNAL_ERROR;
384     case RST_STREAM_FLOW_CONTROL_ERROR:
385       return STATUS_CODE_FLOW_CONTROL_ERROR;
386     case RST_STREAM_STREAM_IN_USE:
387       return STATUS_CODE_STREAM_IN_USE;
388     case RST_STREAM_STREAM_ALREADY_CLOSED:
389       return STATUS_CODE_STREAM_ALREADY_CLOSED;
390     case RST_STREAM_INVALID_CREDENTIALS:
391       return STATUS_CODE_INVALID_CREDENTIALS;
392     case RST_STREAM_FRAME_SIZE_ERROR:
393       return STATUS_CODE_FRAME_SIZE_ERROR;
394     case RST_STREAM_SETTINGS_TIMEOUT:
395       return STATUS_CODE_SETTINGS_TIMEOUT;
396     case RST_STREAM_CONNECT_ERROR:
397       return STATUS_CODE_CONNECT_ERROR;
398     case RST_STREAM_ENHANCE_YOUR_CALM:
399       return STATUS_CODE_ENHANCE_YOUR_CALM;
400     default:
401       NOTREACHED();
402       return static_cast<SpdyProtocolErrorDetails>(-1);
403   }
404 }
405
406 SpdyGoAwayStatus MapNetErrorToGoAwayStatus(Error err) {
407   switch (err) {
408     case OK:
409       return GOAWAY_NO_ERROR;
410     case ERR_SPDY_PROTOCOL_ERROR:
411       return GOAWAY_PROTOCOL_ERROR;
412     case ERR_SPDY_FLOW_CONTROL_ERROR:
413       return GOAWAY_FLOW_CONTROL_ERROR;
414     case ERR_SPDY_FRAME_SIZE_ERROR:
415       return GOAWAY_FRAME_SIZE_ERROR;
416     case ERR_SPDY_COMPRESSION_ERROR:
417       return GOAWAY_COMPRESSION_ERROR;
418     case ERR_SPDY_INADEQUATE_TRANSPORT_SECURITY:
419       return GOAWAY_INADEQUATE_SECURITY;
420     default:
421       return GOAWAY_PROTOCOL_ERROR;
422   }
423 }
424
425 void SplitPushedHeadersToRequestAndResponse(const SpdyHeaderBlock& headers,
426                                             SpdyMajorVersion protocol_version,
427                                             SpdyHeaderBlock* request_headers,
428                                             SpdyHeaderBlock* response_headers) {
429   DCHECK(response_headers);
430   DCHECK(request_headers);
431   for (SpdyHeaderBlock::const_iterator it = headers.begin();
432        it != headers.end();
433        ++it) {
434     SpdyHeaderBlock* to_insert = response_headers;
435     if (protocol_version == SPDY2) {
436       if (it->first == "url")
437         to_insert = request_headers;
438     } else {
439       const char* host = protocol_version >= SPDY4 ? ":authority" : ":host";
440       static const char* scheme = ":scheme";
441       static const char* path = ":path";
442       if (it->first == host || it->first == scheme || it->first == path)
443         to_insert = request_headers;
444     }
445     to_insert->insert(*it);
446   }
447 }
448
449 SpdyStreamRequest::SpdyStreamRequest() : weak_ptr_factory_(this) {
450   Reset();
451 }
452
453 SpdyStreamRequest::~SpdyStreamRequest() {
454   CancelRequest();
455 }
456
457 int SpdyStreamRequest::StartRequest(
458     SpdyStreamType type,
459     const base::WeakPtr<SpdySession>& session,
460     const GURL& url,
461     RequestPriority priority,
462     const BoundNetLog& net_log,
463     const CompletionCallback& callback) {
464   DCHECK(session);
465   DCHECK(!session_);
466   DCHECK(!stream_);
467   DCHECK(callback_.is_null());
468
469   type_ = type;
470   session_ = session;
471   url_ = url;
472   priority_ = priority;
473   net_log_ = net_log;
474   callback_ = callback;
475
476   base::WeakPtr<SpdyStream> stream;
477   int rv = session->TryCreateStream(weak_ptr_factory_.GetWeakPtr(), &stream);
478   if (rv == OK) {
479     Reset();
480     stream_ = stream;
481   }
482   return rv;
483 }
484
485 void SpdyStreamRequest::CancelRequest() {
486   if (session_)
487     session_->CancelStreamRequest(weak_ptr_factory_.GetWeakPtr());
488   Reset();
489   // Do this to cancel any pending CompleteStreamRequest() tasks.
490   weak_ptr_factory_.InvalidateWeakPtrs();
491 }
492
493 base::WeakPtr<SpdyStream> SpdyStreamRequest::ReleaseStream() {
494   DCHECK(!session_);
495   base::WeakPtr<SpdyStream> stream = stream_;
496   DCHECK(stream);
497   Reset();
498   return stream;
499 }
500
501 void SpdyStreamRequest::OnRequestCompleteSuccess(
502     const base::WeakPtr<SpdyStream>& stream) {
503   DCHECK(session_);
504   DCHECK(!stream_);
505   DCHECK(!callback_.is_null());
506   CompletionCallback callback = callback_;
507   Reset();
508   DCHECK(stream);
509   stream_ = stream;
510   callback.Run(OK);
511 }
512
513 void SpdyStreamRequest::OnRequestCompleteFailure(int rv) {
514   DCHECK(session_);
515   DCHECK(!stream_);
516   DCHECK(!callback_.is_null());
517   CompletionCallback callback = callback_;
518   Reset();
519   DCHECK_NE(rv, OK);
520   callback.Run(rv);
521 }
522
523 void SpdyStreamRequest::Reset() {
524   type_ = SPDY_BIDIRECTIONAL_STREAM;
525   session_.reset();
526   stream_.reset();
527   url_ = GURL();
528   priority_ = MINIMUM_PRIORITY;
529   net_log_ = BoundNetLog();
530   callback_.Reset();
531 }
532
533 SpdySession::ActiveStreamInfo::ActiveStreamInfo()
534     : stream(NULL),
535       waiting_for_syn_reply(false) {}
536
537 SpdySession::ActiveStreamInfo::ActiveStreamInfo(SpdyStream* stream)
538     : stream(stream),
539       waiting_for_syn_reply(stream->type() != SPDY_PUSH_STREAM) {
540 }
541
542 SpdySession::ActiveStreamInfo::~ActiveStreamInfo() {}
543
544 SpdySession::PushedStreamInfo::PushedStreamInfo() : stream_id(0) {}
545
546 SpdySession::PushedStreamInfo::PushedStreamInfo(
547     SpdyStreamId stream_id,
548     base::TimeTicks creation_time)
549     : stream_id(stream_id),
550       creation_time(creation_time) {}
551
552 SpdySession::PushedStreamInfo::~PushedStreamInfo() {}
553
554 // static
555 bool SpdySession::CanPool(TransportSecurityState* transport_security_state,
556                           const SSLInfo& ssl_info,
557                           const std::string& old_hostname,
558                           const std::string& new_hostname) {
559   // Pooling is prohibited if the server cert is not valid for the new domain,
560   // and for connections on which client certs were sent. It is also prohibited
561   // when channel ID was sent if the hosts are from different eTLDs+1.
562   if (IsCertStatusError(ssl_info.cert_status))
563     return false;
564
565   if (ssl_info.client_cert_sent)
566     return false;
567
568   if (ssl_info.channel_id_sent &&
569       ChannelIDService::GetDomainForHost(new_hostname) !=
570       ChannelIDService::GetDomainForHost(old_hostname)) {
571     return false;
572   }
573
574   bool unused = false;
575   if (!ssl_info.cert->VerifyNameMatch(new_hostname, &unused))
576     return false;
577
578   std::string pinning_failure_log;
579   if (!transport_security_state->CheckPublicKeyPins(
580           new_hostname,
581           ssl_info.is_issued_by_known_root,
582           ssl_info.public_key_hashes,
583           &pinning_failure_log)) {
584     return false;
585   }
586
587   return true;
588 }
589
590 SpdySession::SpdySession(
591     const SpdySessionKey& spdy_session_key,
592     const base::WeakPtr<HttpServerProperties>& http_server_properties,
593     TransportSecurityState* transport_security_state,
594     bool verify_domain_authentication,
595     bool enable_sending_initial_data,
596     bool enable_compression,
597     bool enable_ping_based_connection_checking,
598     NextProto default_protocol,
599     size_t stream_initial_recv_window_size,
600     size_t initial_max_concurrent_streams,
601     size_t max_concurrent_streams_limit,
602     TimeFunc time_func,
603     const HostPortPair& trusted_spdy_proxy,
604     NetLog* net_log)
605     : in_io_loop_(false),
606       spdy_session_key_(spdy_session_key),
607       pool_(NULL),
608       http_server_properties_(http_server_properties),
609       transport_security_state_(transport_security_state),
610       read_buffer_(new IOBuffer(kReadBufferSize)),
611       stream_hi_water_mark_(kFirstStreamId),
612       last_accepted_push_stream_id_(0),
613       num_pushed_streams_(0u),
614       num_active_pushed_streams_(0u),
615       in_flight_write_frame_type_(DATA),
616       in_flight_write_frame_size_(0),
617       is_secure_(false),
618       certificate_error_code_(OK),
619       availability_state_(STATE_AVAILABLE),
620       read_state_(READ_STATE_DO_READ),
621       write_state_(WRITE_STATE_IDLE),
622       error_on_close_(OK),
623       max_concurrent_streams_(initial_max_concurrent_streams == 0
624                                   ? kInitialMaxConcurrentStreams
625                                   : initial_max_concurrent_streams),
626       max_concurrent_streams_limit_(max_concurrent_streams_limit == 0
627                                         ? kMaxConcurrentStreamLimit
628                                         : max_concurrent_streams_limit),
629       max_concurrent_pushed_streams_(kMaxConcurrentPushedStreams),
630       streams_initiated_count_(0),
631       streams_pushed_count_(0),
632       streams_pushed_and_claimed_count_(0),
633       streams_abandoned_count_(0),
634       total_bytes_received_(0),
635       sent_settings_(false),
636       received_settings_(false),
637       stalled_streams_(0),
638       pings_in_flight_(0),
639       next_ping_id_(1),
640       last_activity_time_(time_func()),
641       last_compressed_frame_len_(0),
642       check_ping_status_pending_(false),
643       send_connection_header_prefix_(false),
644       flow_control_state_(FLOW_CONTROL_NONE),
645       stream_initial_send_window_size_(kSpdyStreamInitialWindowSize),
646       stream_initial_recv_window_size_(stream_initial_recv_window_size == 0
647                                            ? kDefaultInitialRecvWindowSize
648                                            : stream_initial_recv_window_size),
649       session_send_window_size_(0),
650       session_recv_window_size_(0),
651       session_unacked_recv_window_bytes_(0),
652       net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)),
653       verify_domain_authentication_(verify_domain_authentication),
654       enable_sending_initial_data_(enable_sending_initial_data),
655       enable_compression_(enable_compression),
656       enable_ping_based_connection_checking_(
657           enable_ping_based_connection_checking),
658       protocol_(default_protocol),
659       connection_at_risk_of_loss_time_(
660           base::TimeDelta::FromSeconds(kDefaultConnectionAtRiskOfLossSeconds)),
661       hung_interval_(base::TimeDelta::FromSeconds(kHungIntervalSeconds)),
662       trusted_spdy_proxy_(trusted_spdy_proxy),
663       time_func_(time_func),
664       weak_factory_(this) {
665   DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
666   DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
667   DCHECK(HttpStreamFactory::spdy_enabled());
668   net_log_.BeginEvent(
669       NetLog::TYPE_SPDY_SESSION,
670       base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair()));
671   next_unclaimed_push_stream_sweep_time_ = time_func_() +
672       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
673   // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
674 }
675
676 SpdySession::~SpdySession() {
677   CHECK(!in_io_loop_);
678   DcheckDraining();
679
680   // TODO(akalin): Check connection->is_initialized() instead. This
681   // requires re-working CreateFakeSpdySession(), though.
682   DCHECK(connection_->socket());
683   // With SPDY we can't recycle sockets.
684   connection_->socket()->Disconnect();
685
686   RecordHistograms();
687
688   net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION);
689 }
690
691 void SpdySession::InitializeWithSocket(
692     scoped_ptr<ClientSocketHandle> connection,
693     SpdySessionPool* pool,
694     bool is_secure,
695     int certificate_error_code) {
696   CHECK(!in_io_loop_);
697   DCHECK_EQ(availability_state_, STATE_AVAILABLE);
698   DCHECK_EQ(read_state_, READ_STATE_DO_READ);
699   DCHECK_EQ(write_state_, WRITE_STATE_IDLE);
700   DCHECK(!connection_);
701
702   DCHECK(certificate_error_code == OK ||
703          certificate_error_code < ERR_IO_PENDING);
704   // TODO(akalin): Check connection->is_initialized() instead. This
705   // requires re-working CreateFakeSpdySession(), though.
706   DCHECK(connection->socket());
707
708   base::StatsCounter spdy_sessions("spdy.sessions");
709   spdy_sessions.Increment();
710
711   connection_ = connection.Pass();
712   is_secure_ = is_secure;
713   certificate_error_code_ = certificate_error_code;
714
715   NextProto protocol_negotiated =
716       connection_->socket()->GetNegotiatedProtocol();
717   if (protocol_negotiated != kProtoUnknown) {
718     protocol_ = protocol_negotiated;
719   }
720   DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
721   DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
722
723   if (protocol_ == kProtoSPDY4)
724     send_connection_header_prefix_ = true;
725
726   if (protocol_ >= kProtoSPDY31) {
727     flow_control_state_ = FLOW_CONTROL_STREAM_AND_SESSION;
728     session_send_window_size_ = kSpdySessionInitialWindowSize;
729     session_recv_window_size_ = kSpdySessionInitialWindowSize;
730   } else if (protocol_ >= kProtoSPDY3) {
731     flow_control_state_ = FLOW_CONTROL_STREAM;
732   } else {
733     flow_control_state_ = FLOW_CONTROL_NONE;
734   }
735
736   buffered_spdy_framer_.reset(
737       new BufferedSpdyFramer(NextProtoToSpdyMajorVersion(protocol_),
738                              enable_compression_));
739   buffered_spdy_framer_->set_visitor(this);
740   buffered_spdy_framer_->set_debug_visitor(this);
741   UMA_HISTOGRAM_ENUMERATION(
742       "Net.SpdyVersion2",
743       protocol_ - kProtoSPDYMinimumVersion,
744       kProtoSPDYMaximumVersion - kProtoSPDYMinimumVersion + 1);
745
746   net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_INITIALIZED,
747                     base::Bind(&NetLogSpdyInitializedCallback,
748                                connection_->socket()->NetLog().source(),
749                                protocol_));
750
751   DCHECK_EQ(availability_state_, STATE_AVAILABLE);
752   connection_->AddHigherLayeredPool(this);
753   if (enable_sending_initial_data_)
754     SendInitialData();
755   pool_ = pool;
756
757   // Bootstrap the read loop.
758   base::MessageLoop::current()->PostTask(
759       FROM_HERE,
760       base::Bind(&SpdySession::PumpReadLoop,
761                  weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
762 }
763
764 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
765   if (!verify_domain_authentication_)
766     return true;
767
768   if (availability_state_ == STATE_DRAINING)
769     return false;
770
771   SSLInfo ssl_info;
772   bool was_npn_negotiated;
773   NextProto protocol_negotiated = kProtoUnknown;
774   if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated))
775     return true;   // This is not a secure session, so all domains are okay.
776
777   return CanPool(transport_security_state_, ssl_info,
778                  host_port_pair().host(), domain);
779 }
780
781 int SpdySession::GetPushStream(
782     const GURL& url,
783     base::WeakPtr<SpdyStream>* stream,
784     const BoundNetLog& stream_net_log) {
785   CHECK(!in_io_loop_);
786
787   stream->reset();
788
789   if (availability_state_ == STATE_DRAINING)
790     return ERR_CONNECTION_CLOSED;
791
792   Error err = TryAccessStream(url);
793   if (err != OK)
794     return err;
795
796   *stream = GetActivePushStream(url);
797   if (*stream) {
798     DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_);
799     streams_pushed_and_claimed_count_++;
800   }
801   return OK;
802 }
803
804 // {,Try}CreateStream() and TryAccessStream() can be called with
805 // |in_io_loop_| set if a stream is being created in response to
806 // another being closed due to received data.
807
808 Error SpdySession::TryAccessStream(const GURL& url) {
809   if (is_secure_ && certificate_error_code_ != OK &&
810       (url.SchemeIs("https") || url.SchemeIs("wss"))) {
811     RecordProtocolErrorHistogram(
812         PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION);
813     DoDrainSession(
814         static_cast<Error>(certificate_error_code_),
815         "Tried to get SPDY stream for secure content over an unauthenticated "
816         "session.");
817     return ERR_SPDY_PROTOCOL_ERROR;
818   }
819   return OK;
820 }
821
822 int SpdySession::TryCreateStream(
823     const base::WeakPtr<SpdyStreamRequest>& request,
824     base::WeakPtr<SpdyStream>* stream) {
825   DCHECK(request);
826
827   if (availability_state_ == STATE_GOING_AWAY)
828     return ERR_FAILED;
829
830   if (availability_state_ == STATE_DRAINING)
831     return ERR_CONNECTION_CLOSED;
832
833   Error err = TryAccessStream(request->url());
834   if (err != OK)
835     return err;
836
837   if (!max_concurrent_streams_ ||
838       (active_streams_.size() + created_streams_.size() - num_pushed_streams_ <
839        max_concurrent_streams_)) {
840     return CreateStream(*request, stream);
841   }
842
843   stalled_streams_++;
844   net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS);
845   RequestPriority priority = request->priority();
846   CHECK_GE(priority, MINIMUM_PRIORITY);
847   CHECK_LE(priority, MAXIMUM_PRIORITY);
848   pending_create_stream_queues_[priority].push_back(request);
849   return ERR_IO_PENDING;
850 }
851
852 int SpdySession::CreateStream(const SpdyStreamRequest& request,
853                               base::WeakPtr<SpdyStream>* stream) {
854   DCHECK_GE(request.priority(), MINIMUM_PRIORITY);
855   DCHECK_LE(request.priority(), MAXIMUM_PRIORITY);
856
857   if (availability_state_ == STATE_GOING_AWAY)
858     return ERR_FAILED;
859
860   if (availability_state_ == STATE_DRAINING)
861     return ERR_CONNECTION_CLOSED;
862
863   Error err = TryAccessStream(request.url());
864   if (err != OK) {
865     // This should have been caught in TryCreateStream().
866     NOTREACHED();
867     return err;
868   }
869
870   DCHECK(connection_->socket());
871   DCHECK(connection_->socket()->IsConnected());
872   if (connection_->socket()) {
873     UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected",
874                           connection_->socket()->IsConnected());
875     if (!connection_->socket()->IsConnected()) {
876       DoDrainSession(
877           ERR_CONNECTION_CLOSED,
878           "Tried to create SPDY stream for a closed socket connection.");
879       return ERR_CONNECTION_CLOSED;
880     }
881   }
882
883   scoped_ptr<SpdyStream> new_stream(
884       new SpdyStream(request.type(), GetWeakPtr(), request.url(),
885                      request.priority(),
886                      stream_initial_send_window_size_,
887                      stream_initial_recv_window_size_,
888                      request.net_log()));
889   *stream = new_stream->GetWeakPtr();
890   InsertCreatedStream(new_stream.Pass());
891
892   UMA_HISTOGRAM_CUSTOM_COUNTS(
893       "Net.SpdyPriorityCount",
894       static_cast<int>(request.priority()), 0, 10, 11);
895
896   return OK;
897 }
898
899 void SpdySession::CancelStreamRequest(
900     const base::WeakPtr<SpdyStreamRequest>& request) {
901   DCHECK(request);
902   RequestPriority priority = request->priority();
903   CHECK_GE(priority, MINIMUM_PRIORITY);
904   CHECK_LE(priority, MAXIMUM_PRIORITY);
905
906 #if DCHECK_IS_ON
907   // |request| should not be in a queue not matching its priority.
908   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
909     if (priority == i)
910       continue;
911     PendingStreamRequestQueue* queue = &pending_create_stream_queues_[i];
912     DCHECK(std::find_if(queue->begin(),
913                         queue->end(),
914                         RequestEquals(request)) == queue->end());
915   }
916 #endif
917
918   PendingStreamRequestQueue* queue =
919       &pending_create_stream_queues_[priority];
920   // Remove |request| from |queue| while preserving the order of the
921   // other elements.
922   PendingStreamRequestQueue::iterator it =
923       std::find_if(queue->begin(), queue->end(), RequestEquals(request));
924   // The request may already be removed if there's a
925   // CompleteStreamRequest() in flight.
926   if (it != queue->end()) {
927     it = queue->erase(it);
928     // |request| should be in the queue at most once, and if it is
929     // present, should not be pending completion.
930     DCHECK(std::find_if(it, queue->end(), RequestEquals(request)) ==
931            queue->end());
932   }
933 }
934
935 base::WeakPtr<SpdyStreamRequest> SpdySession::GetNextPendingStreamRequest() {
936   for (int j = MAXIMUM_PRIORITY; j >= MINIMUM_PRIORITY; --j) {
937     if (pending_create_stream_queues_[j].empty())
938       continue;
939
940     base::WeakPtr<SpdyStreamRequest> pending_request =
941         pending_create_stream_queues_[j].front();
942     DCHECK(pending_request);
943     pending_create_stream_queues_[j].pop_front();
944     return pending_request;
945   }
946   return base::WeakPtr<SpdyStreamRequest>();
947 }
948
949 void SpdySession::ProcessPendingStreamRequests() {
950   // Like |max_concurrent_streams_|, 0 means infinite for
951   // |max_requests_to_process|.
952   size_t max_requests_to_process = 0;
953   if (max_concurrent_streams_ != 0) {
954     max_requests_to_process =
955         max_concurrent_streams_ -
956         (active_streams_.size() + created_streams_.size());
957   }
958   for (size_t i = 0;
959        max_requests_to_process == 0 || i < max_requests_to_process; ++i) {
960     base::WeakPtr<SpdyStreamRequest> pending_request =
961         GetNextPendingStreamRequest();
962     if (!pending_request)
963       break;
964
965     // Note that this post can race with other stream creations, and it's
966     // possible that the un-stalled stream will be stalled again if it loses.
967     // TODO(jgraettinger): Provide stronger ordering guarantees.
968     base::MessageLoop::current()->PostTask(
969         FROM_HERE,
970         base::Bind(&SpdySession::CompleteStreamRequest,
971                    weak_factory_.GetWeakPtr(),
972                    pending_request));
973   }
974 }
975
976 void SpdySession::AddPooledAlias(const SpdySessionKey& alias_key) {
977   pooled_aliases_.insert(alias_key);
978 }
979
980 SpdyMajorVersion SpdySession::GetProtocolVersion() const {
981   DCHECK(buffered_spdy_framer_.get());
982   return buffered_spdy_framer_->protocol_version();
983 }
984
985 bool SpdySession::HasAcceptableTransportSecurity() const {
986   // If we're not even using TLS, we have no standards to meet.
987   if (!is_secure_) {
988     return true;
989   }
990
991   // We don't enforce transport security standards for older SPDY versions.
992   if (GetProtocolVersion() < SPDY4) {
993     return true;
994   }
995
996   SSLInfo ssl_info;
997   CHECK(connection_->socket()->GetSSLInfo(&ssl_info));
998
999   // HTTP/2 requires TLS 1.2+
1000   if (SSLConnectionStatusToVersion(ssl_info.connection_status) <
1001       SSL_CONNECTION_VERSION_TLS1_2) {
1002     return false;
1003   }
1004
1005   if (!IsSecureTLSCipherSuite(
1006           SSLConnectionStatusToCipherSuite(ssl_info.connection_status))) {
1007     return false;
1008   }
1009
1010   return true;
1011 }
1012
1013 base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() {
1014   return weak_factory_.GetWeakPtr();
1015 }
1016
1017 bool SpdySession::CloseOneIdleConnection() {
1018   CHECK(!in_io_loop_);
1019   DCHECK(pool_);
1020   if (active_streams_.empty()) {
1021     DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
1022   }
1023   // Return false as the socket wasn't immediately closed.
1024   return false;
1025 }
1026
1027 void SpdySession::EnqueueStreamWrite(
1028     const base::WeakPtr<SpdyStream>& stream,
1029     SpdyFrameType frame_type,
1030     scoped_ptr<SpdyBufferProducer> producer) {
1031   DCHECK(frame_type == HEADERS ||
1032          frame_type == DATA ||
1033          frame_type == CREDENTIAL ||
1034          frame_type == SYN_STREAM);
1035   EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream);
1036 }
1037
1038 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream(
1039     SpdyStreamId stream_id,
1040     RequestPriority priority,
1041     SpdyControlFlags flags,
1042     const SpdyHeaderBlock& block) {
1043   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
1044   CHECK(it != active_streams_.end());
1045   CHECK_EQ(it->second.stream->stream_id(), stream_id);
1046
1047   SendPrefacePingIfNoneInFlight();
1048
1049   DCHECK(buffered_spdy_framer_.get());
1050   SpdyPriority spdy_priority =
1051       ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion());
1052
1053   scoped_ptr<SpdyFrame> syn_frame;
1054   // TODO(hkhalil): Avoid copy of |block|.
1055   if (GetProtocolVersion() <= SPDY3) {
1056     SpdySynStreamIR syn_stream(stream_id);
1057     syn_stream.set_associated_to_stream_id(0);
1058     syn_stream.set_priority(spdy_priority);
1059     syn_stream.set_fin((flags & CONTROL_FLAG_FIN) != 0);
1060     syn_stream.set_unidirectional((flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0);
1061     syn_stream.set_name_value_block(block);
1062     syn_frame.reset(buffered_spdy_framer_->SerializeFrame(syn_stream));
1063   } else {
1064     SpdyHeadersIR headers(stream_id);
1065     headers.set_priority(spdy_priority);
1066     headers.set_has_priority(true);
1067     headers.set_fin((flags & CONTROL_FLAG_FIN) != 0);
1068     headers.set_name_value_block(block);
1069     syn_frame.reset(buffered_spdy_framer_->SerializeFrame(headers));
1070   }
1071
1072   base::StatsCounter spdy_requests("spdy.requests");
1073   spdy_requests.Increment();
1074   streams_initiated_count_++;
1075
1076   if (net_log().IsLogging()) {
1077     net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
1078                        base::Bind(&NetLogSpdySynStreamSentCallback,
1079                                   &block,
1080                                   (flags & CONTROL_FLAG_FIN) != 0,
1081                                   (flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0,
1082                                   spdy_priority,
1083                                   stream_id));
1084   }
1085
1086   return syn_frame.Pass();
1087 }
1088
1089 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
1090                                                      IOBuffer* data,
1091                                                      int len,
1092                                                      SpdyDataFlags flags) {
1093   if (availability_state_ == STATE_DRAINING) {
1094     return scoped_ptr<SpdyBuffer>();
1095   }
1096
1097   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
1098   CHECK(it != active_streams_.end());
1099   SpdyStream* stream = it->second.stream;
1100   CHECK_EQ(stream->stream_id(), stream_id);
1101
1102   if (len < 0) {
1103     NOTREACHED();
1104     return scoped_ptr<SpdyBuffer>();
1105   }
1106
1107   int effective_len = std::min(len, kMaxSpdyFrameChunkSize);
1108
1109   bool send_stalled_by_stream =
1110       (flow_control_state_ >= FLOW_CONTROL_STREAM) &&
1111       (stream->send_window_size() <= 0);
1112   bool send_stalled_by_session = IsSendStalled();
1113
1114   // NOTE: There's an enum of the same name in histograms.xml.
1115   enum SpdyFrameFlowControlState {
1116     SEND_NOT_STALLED,
1117     SEND_STALLED_BY_STREAM,
1118     SEND_STALLED_BY_SESSION,
1119     SEND_STALLED_BY_STREAM_AND_SESSION,
1120   };
1121
1122   SpdyFrameFlowControlState frame_flow_control_state = SEND_NOT_STALLED;
1123   if (send_stalled_by_stream) {
1124     if (send_stalled_by_session) {
1125       frame_flow_control_state = SEND_STALLED_BY_STREAM_AND_SESSION;
1126     } else {
1127       frame_flow_control_state = SEND_STALLED_BY_STREAM;
1128     }
1129   } else if (send_stalled_by_session) {
1130     frame_flow_control_state = SEND_STALLED_BY_SESSION;
1131   }
1132
1133   if (flow_control_state_ == FLOW_CONTROL_STREAM) {
1134     UMA_HISTOGRAM_ENUMERATION(
1135         "Net.SpdyFrameStreamFlowControlState",
1136         frame_flow_control_state,
1137         SEND_STALLED_BY_STREAM + 1);
1138   } else if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1139     UMA_HISTOGRAM_ENUMERATION(
1140         "Net.SpdyFrameStreamAndSessionFlowControlState",
1141         frame_flow_control_state,
1142         SEND_STALLED_BY_STREAM_AND_SESSION + 1);
1143   }
1144
1145   // Obey send window size of the stream if stream flow control is
1146   // enabled.
1147   if (flow_control_state_ >= FLOW_CONTROL_STREAM) {
1148     if (send_stalled_by_stream) {
1149       stream->set_send_stalled_by_flow_control(true);
1150       // Even though we're currently stalled only by the stream, we
1151       // might end up being stalled by the session also.
1152       QueueSendStalledStream(*stream);
1153       net_log().AddEvent(
1154           NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_STREAM_SEND_WINDOW,
1155           NetLog::IntegerCallback("stream_id", stream_id));
1156       return scoped_ptr<SpdyBuffer>();
1157     }
1158
1159     effective_len = std::min(effective_len, stream->send_window_size());
1160   }
1161
1162   // Obey send window size of the session if session flow control is
1163   // enabled.
1164   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1165     if (send_stalled_by_session) {
1166       stream->set_send_stalled_by_flow_control(true);
1167       QueueSendStalledStream(*stream);
1168       net_log().AddEvent(
1169           NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_SESSION_SEND_WINDOW,
1170           NetLog::IntegerCallback("stream_id", stream_id));
1171       return scoped_ptr<SpdyBuffer>();
1172     }
1173
1174     effective_len = std::min(effective_len, session_send_window_size_);
1175   }
1176
1177   DCHECK_GE(effective_len, 0);
1178
1179   // Clear FIN flag if only some of the data will be in the data
1180   // frame.
1181   if (effective_len < len)
1182     flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
1183
1184   if (net_log().IsLogging()) {
1185     net_log().AddEvent(
1186         NetLog::TYPE_SPDY_SESSION_SEND_DATA,
1187         base::Bind(&NetLogSpdyDataCallback, stream_id, effective_len,
1188                    (flags & DATA_FLAG_FIN) != 0));
1189   }
1190
1191   // Send PrefacePing for DATA_FRAMEs with nonzero payload size.
1192   if (effective_len > 0)
1193     SendPrefacePingIfNoneInFlight();
1194
1195   // TODO(mbelshe): reduce memory copies here.
1196   DCHECK(buffered_spdy_framer_.get());
1197   scoped_ptr<SpdyFrame> frame(
1198       buffered_spdy_framer_->CreateDataFrame(
1199           stream_id, data->data(),
1200           static_cast<uint32>(effective_len), flags));
1201
1202   scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass()));
1203
1204   // Send window size is based on payload size, so nothing to do if this is
1205   // just a FIN with no payload.
1206   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION &&
1207       effective_len != 0) {
1208     DecreaseSendWindowSize(static_cast<int32>(effective_len));
1209     data_buffer->AddConsumeCallback(
1210         base::Bind(&SpdySession::OnWriteBufferConsumed,
1211                    weak_factory_.GetWeakPtr(),
1212                    static_cast<size_t>(effective_len)));
1213   }
1214
1215   return data_buffer.Pass();
1216 }
1217
1218 void SpdySession::CloseActiveStream(SpdyStreamId stream_id, int status) {
1219   DCHECK_NE(stream_id, 0u);
1220
1221   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1222   if (it == active_streams_.end()) {
1223     NOTREACHED();
1224     return;
1225   }
1226
1227   CloseActiveStreamIterator(it, status);
1228 }
1229
1230 void SpdySession::CloseCreatedStream(
1231     const base::WeakPtr<SpdyStream>& stream, int status) {
1232   DCHECK_EQ(stream->stream_id(), 0u);
1233
1234   CreatedStreamSet::iterator it = created_streams_.find(stream.get());
1235   if (it == created_streams_.end()) {
1236     NOTREACHED();
1237     return;
1238   }
1239
1240   CloseCreatedStreamIterator(it, status);
1241 }
1242
1243 void SpdySession::ResetStream(SpdyStreamId stream_id,
1244                               SpdyRstStreamStatus status,
1245                               const std::string& description) {
1246   DCHECK_NE(stream_id, 0u);
1247
1248   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1249   if (it == active_streams_.end()) {
1250     NOTREACHED();
1251     return;
1252   }
1253
1254   ResetStreamIterator(it, status, description);
1255 }
1256
1257 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const {
1258   return ContainsKey(active_streams_, stream_id);
1259 }
1260
1261 LoadState SpdySession::GetLoadState() const {
1262   // Just report that we're idle since the session could be doing
1263   // many things concurrently.
1264   return LOAD_STATE_IDLE;
1265 }
1266
1267 void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it,
1268                                             int status) {
1269   // TODO(mbelshe): We should send a RST_STREAM control frame here
1270   //                so that the server can cancel a large send.
1271
1272   scoped_ptr<SpdyStream> owned_stream(it->second.stream);
1273   active_streams_.erase(it);
1274
1275   // TODO(akalin): When SpdyStream was ref-counted (and
1276   // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this
1277   // was only done when status was not OK. This meant that pushed
1278   // streams can still be claimed after they're closed. This is
1279   // probably something that we still want to support, although server
1280   // push is hardly used. Write tests for this and fix this. (See
1281   // http://crbug.com/261712 .)
1282   if (owned_stream->type() == SPDY_PUSH_STREAM) {
1283     unclaimed_pushed_streams_.erase(owned_stream->url());
1284     num_pushed_streams_--;
1285     if (!owned_stream->IsReservedRemote())
1286       num_active_pushed_streams_--;
1287   }
1288
1289   DeleteStream(owned_stream.Pass(), status);
1290   MaybeFinishGoingAway();
1291
1292   // If there are no active streams and the socket pool is stalled, close the
1293   // session to free up a socket slot.
1294   if (active_streams_.empty() && connection_->IsPoolStalled()) {
1295     DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
1296   }
1297 }
1298
1299 void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it,
1300                                              int status) {
1301   scoped_ptr<SpdyStream> owned_stream(*it);
1302   created_streams_.erase(it);
1303   DeleteStream(owned_stream.Pass(), status);
1304 }
1305
1306 void SpdySession::ResetStreamIterator(ActiveStreamMap::iterator it,
1307                                       SpdyRstStreamStatus status,
1308                                       const std::string& description) {
1309   // Send the RST_STREAM frame first as CloseActiveStreamIterator()
1310   // may close us.
1311   SpdyStreamId stream_id = it->first;
1312   RequestPriority priority = it->second.stream->priority();
1313   EnqueueResetStreamFrame(stream_id, priority, status, description);
1314
1315   // Removes any pending writes for the stream except for possibly an
1316   // in-flight one.
1317   CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
1318 }
1319
1320 void SpdySession::EnqueueResetStreamFrame(SpdyStreamId stream_id,
1321                                           RequestPriority priority,
1322                                           SpdyRstStreamStatus status,
1323                                           const std::string& description) {
1324   DCHECK_NE(stream_id, 0u);
1325
1326   net_log().AddEvent(
1327       NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
1328       base::Bind(&NetLogSpdyRstCallback, stream_id, status, &description));
1329
1330   DCHECK(buffered_spdy_framer_.get());
1331   scoped_ptr<SpdyFrame> rst_frame(
1332       buffered_spdy_framer_->CreateRstStream(stream_id, status));
1333
1334   EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass());
1335   RecordProtocolErrorHistogram(MapRstStreamStatusToProtocolError(status));
1336 }
1337
1338 void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) {
1339   CHECK(!in_io_loop_);
1340   if (availability_state_ == STATE_DRAINING) {
1341     return;
1342   }
1343   ignore_result(DoReadLoop(expected_read_state, result));
1344 }
1345
1346 int SpdySession::DoReadLoop(ReadState expected_read_state, int result) {
1347   CHECK(!in_io_loop_);
1348   CHECK_EQ(read_state_, expected_read_state);
1349
1350   in_io_loop_ = true;
1351
1352   int bytes_read_without_yielding = 0;
1353
1354   // Loop until the session is draining, the read becomes blocked, or
1355   // the read limit is exceeded.
1356   while (true) {
1357     switch (read_state_) {
1358       case READ_STATE_DO_READ:
1359         CHECK_EQ(result, OK);
1360         result = DoRead();
1361         break;
1362       case READ_STATE_DO_READ_COMPLETE:
1363         if (result > 0)
1364           bytes_read_without_yielding += result;
1365         result = DoReadComplete(result);
1366         break;
1367       default:
1368         NOTREACHED() << "read_state_: " << read_state_;
1369         break;
1370     }
1371
1372     if (availability_state_ == STATE_DRAINING)
1373       break;
1374
1375     if (result == ERR_IO_PENDING)
1376       break;
1377
1378     if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) {
1379       read_state_ = READ_STATE_DO_READ;
1380       base::MessageLoop::current()->PostTask(
1381           FROM_HERE,
1382           base::Bind(&SpdySession::PumpReadLoop,
1383                      weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
1384       result = ERR_IO_PENDING;
1385       break;
1386     }
1387   }
1388
1389   CHECK(in_io_loop_);
1390   in_io_loop_ = false;
1391
1392   return result;
1393 }
1394
1395 int SpdySession::DoRead() {
1396   CHECK(in_io_loop_);
1397
1398   CHECK(connection_);
1399   CHECK(connection_->socket());
1400   read_state_ = READ_STATE_DO_READ_COMPLETE;
1401   return connection_->socket()->Read(
1402       read_buffer_.get(),
1403       kReadBufferSize,
1404       base::Bind(&SpdySession::PumpReadLoop,
1405                  weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE));
1406 }
1407
1408 int SpdySession::DoReadComplete(int result) {
1409   CHECK(in_io_loop_);
1410
1411   // Parse a frame.  For now this code requires that the frame fit into our
1412   // buffer (kReadBufferSize).
1413   // TODO(mbelshe): support arbitrarily large frames!
1414
1415   if (result == 0) {
1416     UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF",
1417                                 total_bytes_received_, 1, 100000000, 50);
1418     DoDrainSession(ERR_CONNECTION_CLOSED, "Connection closed");
1419
1420     return ERR_CONNECTION_CLOSED;
1421   }
1422
1423   if (result < 0) {
1424     DoDrainSession(static_cast<Error>(result), "result is < 0.");
1425     return result;
1426   }
1427   CHECK_LE(result, kReadBufferSize);
1428   total_bytes_received_ += result;
1429
1430   last_activity_time_ = time_func_();
1431
1432   DCHECK(buffered_spdy_framer_.get());
1433   char* data = read_buffer_->data();
1434   while (result > 0) {
1435     uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result);
1436     result -= bytes_processed;
1437     data += bytes_processed;
1438
1439     if (availability_state_ == STATE_DRAINING) {
1440       return ERR_CONNECTION_CLOSED;
1441     }
1442
1443     DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR);
1444   }
1445
1446   read_state_ = READ_STATE_DO_READ;
1447   return OK;
1448 }
1449
1450 void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) {
1451   CHECK(!in_io_loop_);
1452   DCHECK_EQ(write_state_, expected_write_state);
1453
1454   DoWriteLoop(expected_write_state, result);
1455
1456   if (availability_state_ == STATE_DRAINING && !in_flight_write_ &&
1457       write_queue_.IsEmpty()) {
1458     pool_->RemoveUnavailableSession(GetWeakPtr());  // Destroys |this|.
1459     return;
1460   }
1461 }
1462
1463 int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) {
1464   CHECK(!in_io_loop_);
1465   DCHECK_NE(write_state_, WRITE_STATE_IDLE);
1466   DCHECK_EQ(write_state_, expected_write_state);
1467
1468   in_io_loop_ = true;
1469
1470   // Loop until the session is closed or the write becomes blocked.
1471   while (true) {
1472     switch (write_state_) {
1473       case WRITE_STATE_DO_WRITE:
1474         DCHECK_EQ(result, OK);
1475         result = DoWrite();
1476         break;
1477       case WRITE_STATE_DO_WRITE_COMPLETE:
1478         result = DoWriteComplete(result);
1479         break;
1480       case WRITE_STATE_IDLE:
1481       default:
1482         NOTREACHED() << "write_state_: " << write_state_;
1483         break;
1484     }
1485
1486     if (write_state_ == WRITE_STATE_IDLE) {
1487       DCHECK_EQ(result, ERR_IO_PENDING);
1488       break;
1489     }
1490
1491     if (result == ERR_IO_PENDING)
1492       break;
1493   }
1494
1495   CHECK(in_io_loop_);
1496   in_io_loop_ = false;
1497
1498   return result;
1499 }
1500
1501 int SpdySession::DoWrite() {
1502   CHECK(in_io_loop_);
1503
1504   DCHECK(buffered_spdy_framer_);
1505   if (in_flight_write_) {
1506     DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1507   } else {
1508     // Grab the next frame to send.
1509     SpdyFrameType frame_type = DATA;
1510     scoped_ptr<SpdyBufferProducer> producer;
1511     base::WeakPtr<SpdyStream> stream;
1512     if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) {
1513       write_state_ = WRITE_STATE_IDLE;
1514       return ERR_IO_PENDING;
1515     }
1516
1517     if (stream.get())
1518       CHECK(!stream->IsClosed());
1519
1520     // Activate the stream only when sending the SYN_STREAM frame to
1521     // guarantee monotonically-increasing stream IDs.
1522     if (frame_type == SYN_STREAM) {
1523       CHECK(stream.get());
1524       CHECK_EQ(stream->stream_id(), 0u);
1525       scoped_ptr<SpdyStream> owned_stream =
1526           ActivateCreatedStream(stream.get());
1527       InsertActivatedStream(owned_stream.Pass());
1528
1529       if (stream_hi_water_mark_ > kLastStreamId) {
1530         CHECK_EQ(stream->stream_id(), kLastStreamId);
1531         // We've exhausted the stream ID space, and no new streams may be
1532         // created after this one.
1533         MakeUnavailable();
1534         StartGoingAway(kLastStreamId, ERR_ABORTED);
1535       }
1536     }
1537
1538     in_flight_write_ = producer->ProduceBuffer();
1539     if (!in_flight_write_) {
1540       NOTREACHED();
1541       return ERR_UNEXPECTED;
1542     }
1543     in_flight_write_frame_type_ = frame_type;
1544     in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
1545     DCHECK_GE(in_flight_write_frame_size_,
1546               buffered_spdy_framer_->GetFrameMinimumSize());
1547     in_flight_write_stream_ = stream;
1548   }
1549
1550   write_state_ = WRITE_STATE_DO_WRITE_COMPLETE;
1551
1552   // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems
1553   // with Socket implementations that don't store their IOBuffer
1554   // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345).
1555   scoped_refptr<IOBuffer> write_io_buffer =
1556       in_flight_write_->GetIOBufferForRemainingData();
1557   return connection_->socket()->Write(
1558       write_io_buffer.get(),
1559       in_flight_write_->GetRemainingSize(),
1560       base::Bind(&SpdySession::PumpWriteLoop,
1561                  weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE));
1562 }
1563
1564 int SpdySession::DoWriteComplete(int result) {
1565   CHECK(in_io_loop_);
1566   DCHECK_NE(result, ERR_IO_PENDING);
1567   DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1568
1569   last_activity_time_ = time_func_();
1570
1571   if (result < 0) {
1572     DCHECK_NE(result, ERR_IO_PENDING);
1573     in_flight_write_.reset();
1574     in_flight_write_frame_type_ = DATA;
1575     in_flight_write_frame_size_ = 0;
1576     in_flight_write_stream_.reset();
1577     write_state_ = WRITE_STATE_DO_WRITE;
1578     DoDrainSession(static_cast<Error>(result), "Write error");
1579     return OK;
1580   }
1581
1582   // It should not be possible to have written more bytes than our
1583   // in_flight_write_.
1584   DCHECK_LE(static_cast<size_t>(result),
1585             in_flight_write_->GetRemainingSize());
1586
1587   if (result > 0) {
1588     in_flight_write_->Consume(static_cast<size_t>(result));
1589
1590     // We only notify the stream when we've fully written the pending frame.
1591     if (in_flight_write_->GetRemainingSize() == 0) {
1592       // It is possible that the stream was cancelled while we were
1593       // writing to the socket.
1594       if (in_flight_write_stream_.get()) {
1595         DCHECK_GT(in_flight_write_frame_size_, 0u);
1596         in_flight_write_stream_->OnFrameWriteComplete(
1597             in_flight_write_frame_type_,
1598             in_flight_write_frame_size_);
1599       }
1600
1601       // Cleanup the write which just completed.
1602       in_flight_write_.reset();
1603       in_flight_write_frame_type_ = DATA;
1604       in_flight_write_frame_size_ = 0;
1605       in_flight_write_stream_.reset();
1606     }
1607   }
1608
1609   write_state_ = WRITE_STATE_DO_WRITE;
1610   return OK;
1611 }
1612
1613 void SpdySession::DcheckGoingAway() const {
1614 #if DCHECK_IS_ON
1615   DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1616   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
1617     DCHECK(pending_create_stream_queues_[i].empty());
1618   }
1619   DCHECK(created_streams_.empty());
1620 #endif
1621 }
1622
1623 void SpdySession::DcheckDraining() const {
1624   DcheckGoingAway();
1625   DCHECK_EQ(availability_state_, STATE_DRAINING);
1626   DCHECK(active_streams_.empty());
1627   DCHECK(unclaimed_pushed_streams_.empty());
1628 }
1629
1630 void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id,
1631                                  Error status) {
1632   DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1633
1634   // The loops below are carefully written to avoid reentrancy problems.
1635
1636   while (true) {
1637     size_t old_size = GetTotalSize(pending_create_stream_queues_);
1638     base::WeakPtr<SpdyStreamRequest> pending_request =
1639         GetNextPendingStreamRequest();
1640     if (!pending_request)
1641       break;
1642     // No new stream requests should be added while the session is
1643     // going away.
1644     DCHECK_GT(old_size, GetTotalSize(pending_create_stream_queues_));
1645     pending_request->OnRequestCompleteFailure(ERR_ABORTED);
1646   }
1647
1648   while (true) {
1649     size_t old_size = active_streams_.size();
1650     ActiveStreamMap::iterator it =
1651         active_streams_.lower_bound(last_good_stream_id + 1);
1652     if (it == active_streams_.end())
1653       break;
1654     LogAbandonedActiveStream(it, status);
1655     CloseActiveStreamIterator(it, status);
1656     // No new streams should be activated while the session is going
1657     // away.
1658     DCHECK_GT(old_size, active_streams_.size());
1659   }
1660
1661   while (!created_streams_.empty()) {
1662     size_t old_size = created_streams_.size();
1663     CreatedStreamSet::iterator it = created_streams_.begin();
1664     LogAbandonedStream(*it, status);
1665     CloseCreatedStreamIterator(it, status);
1666     // No new streams should be created while the session is going
1667     // away.
1668     DCHECK_GT(old_size, created_streams_.size());
1669   }
1670
1671   write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id);
1672
1673   DcheckGoingAway();
1674 }
1675
1676 void SpdySession::MaybeFinishGoingAway() {
1677   if (active_streams_.empty() && availability_state_ == STATE_GOING_AWAY) {
1678     DoDrainSession(OK, "Finished going away");
1679   }
1680 }
1681
1682 void SpdySession::DoDrainSession(Error err, const std::string& description) {
1683   if (availability_state_ == STATE_DRAINING) {
1684     return;
1685   }
1686   MakeUnavailable();
1687
1688   // If |err| indicates an error occurred, inform the peer that we're closing
1689   // and why. Don't GOAWAY on a graceful or idle close, as that may
1690   // unnecessarily wake the radio. We could technically GOAWAY on network errors
1691   // (we'll probably fail to actually write it, but that's okay), however many
1692   // unit-tests would need to be updated.
1693   if (err != OK &&
1694       err != ERR_ABORTED &&  // Used by SpdySessionPool to close idle sessions.
1695       err != ERR_NETWORK_CHANGED &&  // Used to deprecate sessions on IP change.
1696       err != ERR_SOCKET_NOT_CONNECTED &&
1697       err != ERR_CONNECTION_CLOSED && err != ERR_CONNECTION_RESET) {
1698     // Enqueue a GOAWAY to inform the peer of why we're closing the connection.
1699     SpdyGoAwayIR goaway_ir(last_accepted_push_stream_id_,
1700                            MapNetErrorToGoAwayStatus(err),
1701                            description);
1702     EnqueueSessionWrite(HIGHEST,
1703                         GOAWAY,
1704                         scoped_ptr<SpdyFrame>(
1705                             buffered_spdy_framer_->SerializeFrame(goaway_ir)));
1706   }
1707
1708   availability_state_ = STATE_DRAINING;
1709   error_on_close_ = err;
1710
1711   net_log_.AddEvent(
1712       NetLog::TYPE_SPDY_SESSION_CLOSE,
1713       base::Bind(&NetLogSpdySessionCloseCallback, err, &description));
1714
1715   UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err);
1716   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors",
1717                               total_bytes_received_, 1, 100000000, 50);
1718
1719   if (err == OK) {
1720     // We ought to be going away already, as this is a graceful close.
1721     DcheckGoingAway();
1722   } else {
1723     StartGoingAway(0, err);
1724   }
1725   DcheckDraining();
1726   MaybePostWriteLoop();
1727 }
1728
1729 void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) {
1730   DCHECK(stream);
1731   std::string description = base::StringPrintf(
1732       "ABANDONED (stream_id=%d): ", stream->stream_id()) +
1733       stream->url().spec();
1734   stream->LogStreamError(status, description);
1735   // We don't increment the streams abandoned counter here. If the
1736   // stream isn't active (i.e., it hasn't written anything to the wire
1737   // yet) then it's as if it never existed. If it is active, then
1738   // LogAbandonedActiveStream() will increment the counters.
1739 }
1740
1741 void SpdySession::LogAbandonedActiveStream(ActiveStreamMap::const_iterator it,
1742                                            Error status) {
1743   DCHECK_GT(it->first, 0u);
1744   LogAbandonedStream(it->second.stream, status);
1745   ++streams_abandoned_count_;
1746   base::StatsCounter abandoned_streams("spdy.abandoned_streams");
1747   abandoned_streams.Increment();
1748   if (it->second.stream->type() == SPDY_PUSH_STREAM &&
1749       unclaimed_pushed_streams_.find(it->second.stream->url()) !=
1750       unclaimed_pushed_streams_.end()) {
1751     base::StatsCounter abandoned_push_streams("spdy.abandoned_push_streams");
1752     abandoned_push_streams.Increment();
1753   }
1754 }
1755
1756 SpdyStreamId SpdySession::GetNewStreamId() {
1757   CHECK_LE(stream_hi_water_mark_, kLastStreamId);
1758   SpdyStreamId id = stream_hi_water_mark_;
1759   stream_hi_water_mark_ += 2;
1760   return id;
1761 }
1762
1763 void SpdySession::CloseSessionOnError(Error err,
1764                                       const std::string& description) {
1765   DCHECK_LT(err, ERR_IO_PENDING);
1766   DoDrainSession(err, description);
1767 }
1768
1769 void SpdySession::MakeUnavailable() {
1770   if (availability_state_ == STATE_AVAILABLE) {
1771     availability_state_ = STATE_GOING_AWAY;
1772     pool_->MakeSessionUnavailable(GetWeakPtr());
1773   }
1774 }
1775
1776 base::Value* SpdySession::GetInfoAsValue() const {
1777   base::DictionaryValue* dict = new base::DictionaryValue();
1778
1779   dict->SetInteger("source_id", net_log_.source().id);
1780
1781   dict->SetString("host_port_pair", host_port_pair().ToString());
1782   if (!pooled_aliases_.empty()) {
1783     base::ListValue* alias_list = new base::ListValue();
1784     for (std::set<SpdySessionKey>::const_iterator it =
1785              pooled_aliases_.begin();
1786          it != pooled_aliases_.end(); it++) {
1787       alias_list->Append(new base::StringValue(
1788           it->host_port_pair().ToString()));
1789     }
1790     dict->Set("aliases", alias_list);
1791   }
1792   dict->SetString("proxy", host_port_proxy_pair().second.ToURI());
1793
1794   dict->SetInteger("active_streams", active_streams_.size());
1795
1796   dict->SetInteger("unclaimed_pushed_streams",
1797                    unclaimed_pushed_streams_.size());
1798
1799   dict->SetBoolean("is_secure", is_secure_);
1800
1801   dict->SetString("protocol_negotiated",
1802                   SSLClientSocket::NextProtoToString(
1803                       connection_->socket()->GetNegotiatedProtocol()));
1804
1805   dict->SetInteger("error", error_on_close_);
1806   dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
1807
1808   dict->SetInteger("streams_initiated_count", streams_initiated_count_);
1809   dict->SetInteger("streams_pushed_count", streams_pushed_count_);
1810   dict->SetInteger("streams_pushed_and_claimed_count",
1811       streams_pushed_and_claimed_count_);
1812   dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
1813   DCHECK(buffered_spdy_framer_.get());
1814   dict->SetInteger("frames_received", buffered_spdy_framer_->frames_received());
1815
1816   dict->SetBoolean("sent_settings", sent_settings_);
1817   dict->SetBoolean("received_settings", received_settings_);
1818
1819   dict->SetInteger("send_window_size", session_send_window_size_);
1820   dict->SetInteger("recv_window_size", session_recv_window_size_);
1821   dict->SetInteger("unacked_recv_window_bytes",
1822                    session_unacked_recv_window_bytes_);
1823   return dict;
1824 }
1825
1826 bool SpdySession::IsReused() const {
1827   return buffered_spdy_framer_->frames_received() > 0 ||
1828       connection_->reuse_type() == ClientSocketHandle::UNUSED_IDLE;
1829 }
1830
1831 bool SpdySession::GetLoadTimingInfo(SpdyStreamId stream_id,
1832                                     LoadTimingInfo* load_timing_info) const {
1833   return connection_->GetLoadTimingInfo(stream_id != kFirstStreamId,
1834                                         load_timing_info);
1835 }
1836
1837 int SpdySession::GetPeerAddress(IPEndPoint* address) const {
1838   int rv = ERR_SOCKET_NOT_CONNECTED;
1839   if (connection_->socket()) {
1840     rv = connection_->socket()->GetPeerAddress(address);
1841   }
1842
1843   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetPeerAddress",
1844                         rv == ERR_SOCKET_NOT_CONNECTED);
1845
1846   return rv;
1847 }
1848
1849 int SpdySession::GetLocalAddress(IPEndPoint* address) const {
1850   int rv = ERR_SOCKET_NOT_CONNECTED;
1851   if (connection_->socket()) {
1852     rv = connection_->socket()->GetLocalAddress(address);
1853   }
1854
1855   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetLocalAddress",
1856                         rv == ERR_SOCKET_NOT_CONNECTED);
1857
1858   return rv;
1859 }
1860
1861 void SpdySession::EnqueueSessionWrite(RequestPriority priority,
1862                                       SpdyFrameType frame_type,
1863                                       scoped_ptr<SpdyFrame> frame) {
1864   DCHECK(frame_type == RST_STREAM || frame_type == SETTINGS ||
1865          frame_type == WINDOW_UPDATE || frame_type == PING ||
1866          frame_type == GOAWAY);
1867   EnqueueWrite(
1868       priority, frame_type,
1869       scoped_ptr<SpdyBufferProducer>(
1870           new SimpleBufferProducer(
1871               scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
1872       base::WeakPtr<SpdyStream>());
1873 }
1874
1875 void SpdySession::EnqueueWrite(RequestPriority priority,
1876                                SpdyFrameType frame_type,
1877                                scoped_ptr<SpdyBufferProducer> producer,
1878                                const base::WeakPtr<SpdyStream>& stream) {
1879   if (availability_state_ == STATE_DRAINING)
1880     return;
1881
1882   write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
1883   MaybePostWriteLoop();
1884 }
1885
1886 void SpdySession::MaybePostWriteLoop() {
1887   if (write_state_ == WRITE_STATE_IDLE) {
1888     CHECK(!in_flight_write_);
1889     write_state_ = WRITE_STATE_DO_WRITE;
1890     base::MessageLoop::current()->PostTask(
1891         FROM_HERE,
1892         base::Bind(&SpdySession::PumpWriteLoop,
1893                    weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK));
1894   }
1895 }
1896
1897 void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) {
1898   CHECK_EQ(stream->stream_id(), 0u);
1899   CHECK(created_streams_.find(stream.get()) == created_streams_.end());
1900   created_streams_.insert(stream.release());
1901 }
1902
1903 scoped_ptr<SpdyStream> SpdySession::ActivateCreatedStream(SpdyStream* stream) {
1904   CHECK_EQ(stream->stream_id(), 0u);
1905   CHECK(created_streams_.find(stream) != created_streams_.end());
1906   stream->set_stream_id(GetNewStreamId());
1907   scoped_ptr<SpdyStream> owned_stream(stream);
1908   created_streams_.erase(stream);
1909   return owned_stream.Pass();
1910 }
1911
1912 void SpdySession::InsertActivatedStream(scoped_ptr<SpdyStream> stream) {
1913   SpdyStreamId stream_id = stream->stream_id();
1914   CHECK_NE(stream_id, 0u);
1915   std::pair<ActiveStreamMap::iterator, bool> result =
1916       active_streams_.insert(
1917           std::make_pair(stream_id, ActiveStreamInfo(stream.get())));
1918   CHECK(result.second);
1919   ignore_result(stream.release());
1920 }
1921
1922 void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) {
1923   if (in_flight_write_stream_.get() == stream.get()) {
1924     // If we're deleting the stream for the in-flight write, we still
1925     // need to let the write complete, so we clear
1926     // |in_flight_write_stream_| and let the write finish on its own
1927     // without notifying |in_flight_write_stream_|.
1928     in_flight_write_stream_.reset();
1929   }
1930
1931   write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr());
1932   stream->OnClose(status);
1933
1934   if (availability_state_ == STATE_AVAILABLE) {
1935     ProcessPendingStreamRequests();
1936   }
1937 }
1938
1939 base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) {
1940   base::StatsCounter used_push_streams("spdy.claimed_push_streams");
1941
1942   PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url);
1943   if (unclaimed_it == unclaimed_pushed_streams_.end())
1944     return base::WeakPtr<SpdyStream>();
1945
1946   SpdyStreamId stream_id = unclaimed_it->second.stream_id;
1947   unclaimed_pushed_streams_.erase(unclaimed_it);
1948
1949   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
1950   if (active_it == active_streams_.end()) {
1951     NOTREACHED();
1952     return base::WeakPtr<SpdyStream>();
1953   }
1954
1955   net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM);
1956   used_push_streams.Increment();
1957   return active_it->second.stream->GetWeakPtr();
1958 }
1959
1960 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info,
1961                              bool* was_npn_negotiated,
1962                              NextProto* protocol_negotiated) {
1963   *was_npn_negotiated = connection_->socket()->WasNpnNegotiated();
1964   *protocol_negotiated = connection_->socket()->GetNegotiatedProtocol();
1965   return connection_->socket()->GetSSLInfo(ssl_info);
1966 }
1967
1968 bool SpdySession::GetSSLCertRequestInfo(
1969     SSLCertRequestInfo* cert_request_info) {
1970   if (!is_secure_)
1971     return false;
1972   GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info);
1973   return true;
1974 }
1975
1976 void SpdySession::OnError(SpdyFramer::SpdyError error_code) {
1977   CHECK(in_io_loop_);
1978
1979   RecordProtocolErrorHistogram(MapFramerErrorToProtocolError(error_code));
1980   std::string description =
1981       base::StringPrintf("Framer error: %d (%s).",
1982                          error_code,
1983                          SpdyFramer::ErrorCodeToString(error_code));
1984   DoDrainSession(MapFramerErrorToNetError(error_code), description);
1985 }
1986
1987 void SpdySession::OnStreamError(SpdyStreamId stream_id,
1988                                 const std::string& description) {
1989   CHECK(in_io_loop_);
1990
1991   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1992   if (it == active_streams_.end()) {
1993     // We still want to send a frame to reset the stream even if we
1994     // don't know anything about it.
1995     EnqueueResetStreamFrame(
1996         stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description);
1997     return;
1998   }
1999
2000   ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description);
2001 }
2002
2003 void SpdySession::OnDataFrameHeader(SpdyStreamId stream_id,
2004                                     size_t length,
2005                                     bool fin) {
2006   CHECK(in_io_loop_);
2007
2008   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2009
2010   // By the time data comes in, the stream may already be inactive.
2011   if (it == active_streams_.end())
2012     return;
2013
2014   SpdyStream* stream = it->second.stream;
2015   CHECK_EQ(stream->stream_id(), stream_id);
2016
2017   DCHECK(buffered_spdy_framer_);
2018   size_t header_len = buffered_spdy_framer_->GetDataFrameMinimumSize();
2019   stream->IncrementRawReceivedBytes(header_len);
2020 }
2021
2022 void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
2023                                     const char* data,
2024                                     size_t len,
2025                                     bool fin) {
2026   CHECK(in_io_loop_);
2027
2028   if (data == NULL && len != 0) {
2029     // This is notification of consumed data padding.
2030     // TODO(jgraettinger): Properly flow padding into WINDOW_UPDATE frames.
2031     // See crbug.com/353012.
2032     return;
2033   }
2034
2035   DCHECK_LT(len, 1u << 24);
2036   if (net_log().IsLogging()) {
2037     net_log().AddEvent(
2038         NetLog::TYPE_SPDY_SESSION_RECV_DATA,
2039         base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin));
2040   }
2041
2042   // Build the buffer as early as possible so that we go through the
2043   // session flow control checks and update
2044   // |unacked_recv_window_bytes_| properly even when the stream is
2045   // inactive (since the other side has still reduced its session send
2046   // window).
2047   scoped_ptr<SpdyBuffer> buffer;
2048   if (data) {
2049     DCHECK_GT(len, 0u);
2050     CHECK_LE(len, static_cast<size_t>(kReadBufferSize));
2051     buffer.reset(new SpdyBuffer(data, len));
2052
2053     if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
2054       DecreaseRecvWindowSize(static_cast<int32>(len));
2055       buffer->AddConsumeCallback(
2056           base::Bind(&SpdySession::OnReadBufferConsumed,
2057                      weak_factory_.GetWeakPtr()));
2058     }
2059   } else {
2060     DCHECK_EQ(len, 0u);
2061   }
2062
2063   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2064
2065   // By the time data comes in, the stream may already be inactive.
2066   if (it == active_streams_.end())
2067     return;
2068
2069   SpdyStream* stream = it->second.stream;
2070   CHECK_EQ(stream->stream_id(), stream_id);
2071
2072   stream->IncrementRawReceivedBytes(len);
2073
2074   if (it->second.waiting_for_syn_reply) {
2075     const std::string& error = "Data received before SYN_REPLY.";
2076     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2077     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2078     return;
2079   }
2080
2081   stream->OnDataReceived(buffer.Pass());
2082 }
2083
2084 void SpdySession::OnSettings(bool clear_persisted) {
2085   CHECK(in_io_loop_);
2086
2087   if (clear_persisted)
2088     http_server_properties_->ClearSpdySettings(host_port_pair());
2089
2090   if (net_log_.IsLogging()) {
2091     net_log_.AddEvent(
2092         NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
2093         base::Bind(&NetLogSpdySettingsCallback, host_port_pair(),
2094                    clear_persisted));
2095   }
2096
2097   if (GetProtocolVersion() >= SPDY4) {
2098     // Send an acknowledgment of the setting.
2099     SpdySettingsIR settings_ir;
2100     settings_ir.set_is_ack(true);
2101     EnqueueSessionWrite(
2102         HIGHEST,
2103         SETTINGS,
2104         scoped_ptr<SpdyFrame>(
2105             buffered_spdy_framer_->SerializeFrame(settings_ir)));
2106   }
2107 }
2108
2109 void SpdySession::OnSetting(SpdySettingsIds id,
2110                             uint8 flags,
2111                             uint32 value) {
2112   CHECK(in_io_loop_);
2113
2114   HandleSetting(id, value);
2115   http_server_properties_->SetSpdySetting(
2116       host_port_pair(),
2117       id,
2118       static_cast<SpdySettingsFlags>(flags),
2119       value);
2120   received_settings_ = true;
2121
2122   // Log the setting.
2123   const SpdyMajorVersion protocol_version = GetProtocolVersion();
2124   net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_RECV_SETTING,
2125                     base::Bind(&NetLogSpdySettingCallback,
2126                                id,
2127                                protocol_version,
2128                                static_cast<SpdySettingsFlags>(flags),
2129                                value));
2130 }
2131
2132 void SpdySession::OnSendCompressedFrame(
2133     SpdyStreamId stream_id,
2134     SpdyFrameType type,
2135     size_t payload_len,
2136     size_t frame_len) {
2137   if (type != SYN_STREAM && type != HEADERS)
2138     return;
2139
2140   DCHECK(buffered_spdy_framer_.get());
2141   size_t compressed_len =
2142       frame_len - buffered_spdy_framer_->GetSynStreamMinimumSize();
2143
2144   if (payload_len) {
2145     // Make sure we avoid early decimal truncation.
2146     int compression_pct = 100 - (100 * compressed_len) / payload_len;
2147     UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage",
2148                              compression_pct);
2149   }
2150 }
2151
2152 void SpdySession::OnReceiveCompressedFrame(
2153     SpdyStreamId stream_id,
2154     SpdyFrameType type,
2155     size_t frame_len) {
2156   last_compressed_frame_len_ = frame_len;
2157 }
2158
2159 int SpdySession::OnInitialResponseHeadersReceived(
2160     const SpdyHeaderBlock& response_headers,
2161     base::Time response_time,
2162     base::TimeTicks recv_first_byte_time,
2163     SpdyStream* stream) {
2164   CHECK(in_io_loop_);
2165   SpdyStreamId stream_id = stream->stream_id();
2166
2167   if (stream->type() == SPDY_PUSH_STREAM) {
2168     DCHECK(stream->IsReservedRemote());
2169     if (max_concurrent_pushed_streams_ &&
2170         num_active_pushed_streams_ >= max_concurrent_pushed_streams_) {
2171       ResetStream(stream_id,
2172                   RST_STREAM_REFUSED_STREAM,
2173                   "Stream concurrency limit reached.");
2174       return STATUS_CODE_REFUSED_STREAM;
2175     }
2176   }
2177
2178   if (stream->type() == SPDY_PUSH_STREAM) {
2179     // Will be balanced in DeleteStream.
2180     num_active_pushed_streams_++;
2181   }
2182
2183   // May invalidate |stream|.
2184   int rv = stream->OnInitialResponseHeadersReceived(
2185       response_headers, response_time, recv_first_byte_time);
2186   if (rv < 0) {
2187     DCHECK_NE(rv, ERR_IO_PENDING);
2188     DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2189   }
2190
2191   return rv;
2192 }
2193
2194 void SpdySession::OnSynStream(SpdyStreamId stream_id,
2195                               SpdyStreamId associated_stream_id,
2196                               SpdyPriority priority,
2197                               bool fin,
2198                               bool unidirectional,
2199                               const SpdyHeaderBlock& headers) {
2200   CHECK(in_io_loop_);
2201
2202   if (GetProtocolVersion() >= SPDY4) {
2203     DCHECK_EQ(0u, associated_stream_id);
2204     OnHeaders(stream_id, fin, headers);
2205     return;
2206   }
2207
2208   base::Time response_time = base::Time::Now();
2209   base::TimeTicks recv_first_byte_time = time_func_();
2210
2211   if (net_log_.IsLogging()) {
2212     net_log_.AddEvent(
2213         NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
2214         base::Bind(&NetLogSpdySynStreamReceivedCallback,
2215                    &headers, fin, unidirectional, priority,
2216                    stream_id, associated_stream_id));
2217   }
2218
2219   // Split headers to simulate push promise and response.
2220   SpdyHeaderBlock request_headers;
2221   SpdyHeaderBlock response_headers;
2222   SplitPushedHeadersToRequestAndResponse(
2223       headers, GetProtocolVersion(), &request_headers, &response_headers);
2224
2225   if (!TryCreatePushStream(
2226           stream_id, associated_stream_id, priority, request_headers))
2227     return;
2228
2229   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
2230   if (active_it == active_streams_.end()) {
2231     NOTREACHED();
2232     return;
2233   }
2234
2235   if (OnInitialResponseHeadersReceived(response_headers,
2236                                        response_time,
2237                                        recv_first_byte_time,
2238                                        active_it->second.stream) != OK)
2239     return;
2240
2241   base::StatsCounter push_requests("spdy.pushed_streams");
2242   push_requests.Increment();
2243 }
2244
2245 void SpdySession::DeleteExpiredPushedStreams() {
2246   if (unclaimed_pushed_streams_.empty())
2247     return;
2248
2249   // Check that adequate time has elapsed since the last sweep.
2250   if (time_func_() < next_unclaimed_push_stream_sweep_time_)
2251     return;
2252
2253   // Gather old streams to delete.
2254   base::TimeTicks minimum_freshness = time_func_() -
2255       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2256   std::vector<SpdyStreamId> streams_to_close;
2257   for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin();
2258        it != unclaimed_pushed_streams_.end(); ++it) {
2259     if (minimum_freshness > it->second.creation_time)
2260       streams_to_close.push_back(it->second.stream_id);
2261   }
2262
2263   for (std::vector<SpdyStreamId>::const_iterator to_close_it =
2264            streams_to_close.begin();
2265        to_close_it != streams_to_close.end(); ++to_close_it) {
2266     ActiveStreamMap::iterator active_it = active_streams_.find(*to_close_it);
2267     if (active_it == active_streams_.end())
2268       continue;
2269
2270     LogAbandonedActiveStream(active_it, ERR_INVALID_SPDY_STREAM);
2271     // CloseActiveStreamIterator() will remove the stream from
2272     // |unclaimed_pushed_streams_|.
2273     ResetStreamIterator(
2274         active_it, RST_STREAM_REFUSED_STREAM, "Stream not claimed.");
2275   }
2276
2277   next_unclaimed_push_stream_sweep_time_ = time_func_() +
2278       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2279 }
2280
2281 void SpdySession::OnSynReply(SpdyStreamId stream_id,
2282                              bool fin,
2283                              const SpdyHeaderBlock& headers) {
2284   CHECK(in_io_loop_);
2285
2286   base::Time response_time = base::Time::Now();
2287   base::TimeTicks recv_first_byte_time = time_func_();
2288
2289   if (net_log().IsLogging()) {
2290     net_log().AddEvent(
2291         NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
2292         base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2293                    &headers, fin, stream_id));
2294   }
2295
2296   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2297   if (it == active_streams_.end()) {
2298     // NOTE:  it may just be that the stream was cancelled.
2299     return;
2300   }
2301
2302   SpdyStream* stream = it->second.stream;
2303   CHECK_EQ(stream->stream_id(), stream_id);
2304
2305   stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2306   last_compressed_frame_len_ = 0;
2307
2308   if (GetProtocolVersion() >= SPDY4) {
2309     const std::string& error =
2310         "SPDY4 wasn't expecting SYN_REPLY.";
2311     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2312     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2313     return;
2314   }
2315   if (!it->second.waiting_for_syn_reply) {
2316     const std::string& error =
2317         "Received duplicate SYN_REPLY for stream.";
2318     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2319     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2320     return;
2321   }
2322   it->second.waiting_for_syn_reply = false;
2323
2324   ignore_result(OnInitialResponseHeadersReceived(
2325       headers, response_time, recv_first_byte_time, stream));
2326 }
2327
2328 void SpdySession::OnHeaders(SpdyStreamId stream_id,
2329                             bool fin,
2330                             const SpdyHeaderBlock& headers) {
2331   CHECK(in_io_loop_);
2332
2333   if (net_log().IsLogging()) {
2334     net_log().AddEvent(
2335         NetLog::TYPE_SPDY_SESSION_RECV_HEADERS,
2336         base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2337                    &headers, fin, stream_id));
2338   }
2339
2340   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2341   if (it == active_streams_.end()) {
2342     // NOTE:  it may just be that the stream was cancelled.
2343     LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
2344     return;
2345   }
2346
2347   SpdyStream* stream = it->second.stream;
2348   CHECK_EQ(stream->stream_id(), stream_id);
2349
2350   stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2351   last_compressed_frame_len_ = 0;
2352
2353   base::Time response_time = base::Time::Now();
2354   base::TimeTicks recv_first_byte_time = time_func_();
2355
2356   if (it->second.waiting_for_syn_reply) {
2357     if (GetProtocolVersion() < SPDY4) {
2358       const std::string& error =
2359           "Was expecting SYN_REPLY, not HEADERS.";
2360       stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2361       ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2362       return;
2363     }
2364
2365     it->second.waiting_for_syn_reply = false;
2366     ignore_result(OnInitialResponseHeadersReceived(
2367         headers, response_time, recv_first_byte_time, stream));
2368   } else if (it->second.stream->IsReservedRemote()) {
2369     ignore_result(OnInitialResponseHeadersReceived(
2370         headers, response_time, recv_first_byte_time, stream));
2371   } else {
2372     int rv = stream->OnAdditionalResponseHeadersReceived(headers);
2373     if (rv < 0) {
2374       DCHECK_NE(rv, ERR_IO_PENDING);
2375       DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2376     }
2377   }
2378 }
2379
2380 bool SpdySession::OnUnknownFrame(SpdyStreamId stream_id, int frame_type) {
2381   // Validate stream id.
2382   // Was the frame sent on a stream id that has not been used in this session?
2383   if (stream_id % 2 == 1 && stream_id > stream_hi_water_mark_)
2384     return false;
2385
2386   if (stream_id % 2 == 0 && stream_id > last_accepted_push_stream_id_)
2387     return false;
2388
2389   return true;
2390 }
2391
2392 void SpdySession::OnRstStream(SpdyStreamId stream_id,
2393                               SpdyRstStreamStatus status) {
2394   CHECK(in_io_loop_);
2395
2396   std::string description;
2397   net_log().AddEvent(
2398       NetLog::TYPE_SPDY_SESSION_RST_STREAM,
2399       base::Bind(&NetLogSpdyRstCallback,
2400                  stream_id, status, &description));
2401
2402   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2403   if (it == active_streams_.end()) {
2404     // NOTE:  it may just be that the stream was cancelled.
2405     LOG(WARNING) << "Received RST for invalid stream" << stream_id;
2406     return;
2407   }
2408
2409   CHECK_EQ(it->second.stream->stream_id(), stream_id);
2410
2411   if (status == 0) {
2412     it->second.stream->OnDataReceived(scoped_ptr<SpdyBuffer>());
2413   } else if (status == RST_STREAM_REFUSED_STREAM) {
2414     CloseActiveStreamIterator(it, ERR_SPDY_SERVER_REFUSED_STREAM);
2415   } else {
2416     RecordProtocolErrorHistogram(
2417         PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM);
2418     it->second.stream->LogStreamError(
2419         ERR_SPDY_PROTOCOL_ERROR,
2420         base::StringPrintf("SPDY stream closed with status: %d", status));
2421     // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
2422     //                For now, it doesn't matter much - it is a protocol error.
2423     CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
2424   }
2425 }
2426
2427 void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id,
2428                            SpdyGoAwayStatus status) {
2429   CHECK(in_io_loop_);
2430
2431   // TODO(jgraettinger): UMA histogram on |status|.
2432
2433   net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY,
2434       base::Bind(&NetLogSpdyGoAwayCallback,
2435                  last_accepted_stream_id,
2436                  active_streams_.size(),
2437                  unclaimed_pushed_streams_.size(),
2438                  status));
2439   MakeUnavailable();
2440   StartGoingAway(last_accepted_stream_id, ERR_ABORTED);
2441   // This is to handle the case when we already don't have any active
2442   // streams (i.e., StartGoingAway() did nothing). Otherwise, we have
2443   // active streams and so the last one being closed will finish the
2444   // going away process (see DeleteStream()).
2445   MaybeFinishGoingAway();
2446 }
2447
2448 void SpdySession::OnPing(SpdyPingId unique_id, bool is_ack) {
2449   CHECK(in_io_loop_);
2450
2451   net_log_.AddEvent(
2452       NetLog::TYPE_SPDY_SESSION_PING,
2453       base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "received"));
2454
2455   // Send response to a PING from server.
2456   if ((protocol_ >= kProtoSPDY4 && !is_ack) ||
2457       (protocol_ < kProtoSPDY4 && unique_id % 2 == 0)) {
2458     WritePingFrame(unique_id, true);
2459     return;
2460   }
2461
2462   --pings_in_flight_;
2463   if (pings_in_flight_ < 0) {
2464     RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING);
2465     DoDrainSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0.");
2466     pings_in_flight_ = 0;
2467     return;
2468   }
2469
2470   if (pings_in_flight_ > 0)
2471     return;
2472
2473   // We will record RTT in histogram when there are no more client sent
2474   // pings_in_flight_.
2475   RecordPingRTTHistogram(time_func_() - last_ping_sent_time_);
2476 }
2477
2478 void SpdySession::OnWindowUpdate(SpdyStreamId stream_id,
2479                                  uint32 delta_window_size) {
2480   CHECK(in_io_loop_);
2481
2482   DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max));
2483   net_log_.AddEvent(
2484       NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME,
2485       base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2486                  stream_id, delta_window_size));
2487
2488   if (stream_id == kSessionFlowControlStreamId) {
2489     // WINDOW_UPDATE for the session.
2490     if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) {
2491       LOG(WARNING) << "Received WINDOW_UPDATE for session when "
2492                    << "session flow control is not turned on";
2493       // TODO(akalin): Record an error and close the session.
2494       return;
2495     }
2496
2497     if (delta_window_size < 1u) {
2498       RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
2499       DoDrainSession(
2500           ERR_SPDY_PROTOCOL_ERROR,
2501           "Received WINDOW_UPDATE with an invalid delta_window_size " +
2502               base::UintToString(delta_window_size));
2503       return;
2504     }
2505
2506     IncreaseSendWindowSize(static_cast<int32>(delta_window_size));
2507   } else {
2508     // WINDOW_UPDATE for a stream.
2509     if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2510       // TODO(akalin): Record an error and close the session.
2511       LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id
2512                    << " when flow control is not turned on";
2513       return;
2514     }
2515
2516     ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2517
2518     if (it == active_streams_.end()) {
2519       // NOTE:  it may just be that the stream was cancelled.
2520       LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
2521       return;
2522     }
2523
2524     SpdyStream* stream = it->second.stream;
2525     CHECK_EQ(stream->stream_id(), stream_id);
2526
2527     if (delta_window_size < 1u) {
2528       ResetStreamIterator(it,
2529                           RST_STREAM_FLOW_CONTROL_ERROR,
2530                           base::StringPrintf(
2531                               "Received WINDOW_UPDATE with an invalid "
2532                               "delta_window_size %ud", delta_window_size));
2533       return;
2534     }
2535
2536     CHECK_EQ(it->second.stream->stream_id(), stream_id);
2537     it->second.stream->IncreaseSendWindowSize(
2538         static_cast<int32>(delta_window_size));
2539   }
2540 }
2541
2542 bool SpdySession::TryCreatePushStream(SpdyStreamId stream_id,
2543                                       SpdyStreamId associated_stream_id,
2544                                       SpdyPriority priority,
2545                                       const SpdyHeaderBlock& headers) {
2546   // Server-initiated streams should have even sequence numbers.
2547   if ((stream_id & 0x1) != 0) {
2548     LOG(WARNING) << "Received invalid push stream id " << stream_id;
2549     if (GetProtocolVersion() > SPDY2)
2550       CloseSessionOnError(ERR_SPDY_PROTOCOL_ERROR, "Odd push stream id.");
2551     return false;
2552   }
2553
2554   if (GetProtocolVersion() > SPDY2) {
2555     if (stream_id <= last_accepted_push_stream_id_) {
2556       LOG(WARNING) << "Received push stream id lesser or equal to the last "
2557                    << "accepted before " << stream_id;
2558       CloseSessionOnError(
2559           ERR_SPDY_PROTOCOL_ERROR,
2560           "New push stream id must be greater than the last accepted.");
2561       return false;
2562     }
2563   }
2564
2565   if (IsStreamActive(stream_id)) {
2566     // For SPDY3 and higher we should not get here, we'll start going away
2567     // earlier on |last_seen_push_stream_id_| check.
2568     CHECK_GT(SPDY3, GetProtocolVersion());
2569     LOG(WARNING) << "Received push for active stream " << stream_id;
2570     return false;
2571   }
2572
2573   last_accepted_push_stream_id_ = stream_id;
2574
2575   RequestPriority request_priority =
2576       ConvertSpdyPriorityToRequestPriority(priority, GetProtocolVersion());
2577
2578   if (availability_state_ == STATE_GOING_AWAY) {
2579     // TODO(akalin): This behavior isn't in the SPDY spec, although it
2580     // probably should be.
2581     EnqueueResetStreamFrame(stream_id,
2582                             request_priority,
2583                             RST_STREAM_REFUSED_STREAM,
2584                             "push stream request received when going away");
2585     return false;
2586   }
2587
2588   if (associated_stream_id == 0) {
2589     // In SPDY4 0 stream id in PUSH_PROMISE frame leads to framer error and
2590     // session going away. We should never get here.
2591     CHECK_GT(SPDY4, GetProtocolVersion());
2592     std::string description = base::StringPrintf(
2593         "Received invalid associated stream id %d for pushed stream %d",
2594         associated_stream_id,
2595         stream_id);
2596     EnqueueResetStreamFrame(
2597         stream_id, request_priority, RST_STREAM_REFUSED_STREAM, description);
2598     return false;
2599   }
2600
2601   streams_pushed_count_++;
2602
2603   // TODO(mbelshe): DCHECK that this is a GET method?
2604
2605   // Verify that the response had a URL for us.
2606   GURL gurl = GetUrlFromHeaderBlock(headers, GetProtocolVersion(), true);
2607   if (!gurl.is_valid()) {
2608     EnqueueResetStreamFrame(stream_id,
2609                             request_priority,
2610                             RST_STREAM_PROTOCOL_ERROR,
2611                             "Pushed stream url was invalid: " + gurl.spec());
2612     return false;
2613   }
2614
2615   // Verify we have a valid stream association.
2616   ActiveStreamMap::iterator associated_it =
2617       active_streams_.find(associated_stream_id);
2618   if (associated_it == active_streams_.end()) {
2619     EnqueueResetStreamFrame(
2620         stream_id,
2621         request_priority,
2622         RST_STREAM_INVALID_STREAM,
2623         base::StringPrintf("Received push for inactive associated stream %d",
2624                            associated_stream_id));
2625     return false;
2626   }
2627
2628   // Check that the pushed stream advertises the same origin as its associated
2629   // stream. Bypass this check if and only if this session is with a SPDY proxy
2630   // that is trusted explicitly via the --trusted-spdy-proxy switch.
2631   if (trusted_spdy_proxy_.Equals(host_port_pair())) {
2632     // Disallow pushing of HTTPS content.
2633     if (gurl.SchemeIs("https")) {
2634       EnqueueResetStreamFrame(
2635           stream_id,
2636           request_priority,
2637           RST_STREAM_REFUSED_STREAM,
2638           base::StringPrintf("Rejected push of Cross Origin HTTPS content %d",
2639                              associated_stream_id));
2640     }
2641   } else {
2642     GURL associated_url(associated_it->second.stream->GetUrlFromHeaders());
2643     if (associated_url.GetOrigin() != gurl.GetOrigin()) {
2644       EnqueueResetStreamFrame(
2645           stream_id,
2646           request_priority,
2647           RST_STREAM_REFUSED_STREAM,
2648           base::StringPrintf("Rejected Cross Origin Push Stream %d",
2649                              associated_stream_id));
2650       return false;
2651     }
2652   }
2653
2654   // There should not be an existing pushed stream with the same path.
2655   PushedStreamMap::iterator pushed_it =
2656       unclaimed_pushed_streams_.lower_bound(gurl);
2657   if (pushed_it != unclaimed_pushed_streams_.end() &&
2658       pushed_it->first == gurl) {
2659     EnqueueResetStreamFrame(
2660         stream_id,
2661         request_priority,
2662         RST_STREAM_PROTOCOL_ERROR,
2663         "Received duplicate pushed stream with url: " + gurl.spec());
2664     return false;
2665   }
2666
2667   scoped_ptr<SpdyStream> stream(new SpdyStream(SPDY_PUSH_STREAM,
2668                                                GetWeakPtr(),
2669                                                gurl,
2670                                                request_priority,
2671                                                stream_initial_send_window_size_,
2672                                                stream_initial_recv_window_size_,
2673                                                net_log_));
2674   stream->set_stream_id(stream_id);
2675
2676   // In spdy4/http2 PUSH_PROMISE arrives on associated stream.
2677   if (associated_it != active_streams_.end() && GetProtocolVersion() >= SPDY4) {
2678     associated_it->second.stream->IncrementRawReceivedBytes(
2679         last_compressed_frame_len_);
2680   } else {
2681     stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2682   }
2683
2684   last_compressed_frame_len_ = 0;
2685
2686   DeleteExpiredPushedStreams();
2687   PushedStreamMap::iterator inserted_pushed_it =
2688       unclaimed_pushed_streams_.insert(
2689           pushed_it,
2690           std::make_pair(gurl, PushedStreamInfo(stream_id, time_func_())));
2691   DCHECK(inserted_pushed_it != pushed_it);
2692
2693   InsertActivatedStream(stream.Pass());
2694
2695   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
2696   if (active_it == active_streams_.end()) {
2697     NOTREACHED();
2698     return false;
2699   }
2700
2701   active_it->second.stream->OnPushPromiseHeadersReceived(headers);
2702   DCHECK(active_it->second.stream->IsReservedRemote());
2703   num_pushed_streams_++;
2704   return true;
2705 }
2706
2707 void SpdySession::OnPushPromise(SpdyStreamId stream_id,
2708                                 SpdyStreamId promised_stream_id,
2709                                 const SpdyHeaderBlock& headers) {
2710   CHECK(in_io_loop_);
2711
2712   if (net_log_.IsLogging()) {
2713     net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_RECV_PUSH_PROMISE,
2714                       base::Bind(&NetLogSpdyPushPromiseReceivedCallback,
2715                                  &headers,
2716                                  stream_id,
2717                                  promised_stream_id));
2718   }
2719
2720   // Any priority will do.
2721   // TODO(baranovich): pass parent stream id priority?
2722   if (!TryCreatePushStream(promised_stream_id, stream_id, 0, headers))
2723     return;
2724
2725   base::StatsCounter push_requests("spdy.pushed_streams");
2726   push_requests.Increment();
2727 }
2728
2729 void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id,
2730                                          uint32 delta_window_size) {
2731   CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2732   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2733   CHECK(it != active_streams_.end());
2734   CHECK_EQ(it->second.stream->stream_id(), stream_id);
2735   SendWindowUpdateFrame(
2736       stream_id, delta_window_size, it->second.stream->priority());
2737 }
2738
2739 void SpdySession::SendInitialData() {
2740   DCHECK(enable_sending_initial_data_);
2741
2742   if (send_connection_header_prefix_) {
2743     DCHECK_EQ(protocol_, kProtoSPDY4);
2744     scoped_ptr<SpdyFrame> connection_header_prefix_frame(
2745         new SpdyFrame(const_cast<char*>(kHttp2ConnectionHeaderPrefix),
2746                       kHttp2ConnectionHeaderPrefixSize,
2747                       false /* take_ownership */));
2748     // Count the prefix as part of the subsequent SETTINGS frame.
2749     EnqueueSessionWrite(HIGHEST, SETTINGS,
2750                         connection_header_prefix_frame.Pass());
2751   }
2752
2753   // First, notify the server about the settings they should use when
2754   // communicating with us.
2755   SettingsMap settings_map;
2756   // Create a new settings frame notifying the server of our
2757   // max concurrent streams and initial window size.
2758   settings_map[SETTINGS_MAX_CONCURRENT_STREAMS] =
2759       SettingsFlagsAndValue(SETTINGS_FLAG_NONE, kMaxConcurrentPushedStreams);
2760   if (flow_control_state_ >= FLOW_CONTROL_STREAM &&
2761       stream_initial_recv_window_size_ != kSpdyStreamInitialWindowSize) {
2762     settings_map[SETTINGS_INITIAL_WINDOW_SIZE] =
2763         SettingsFlagsAndValue(SETTINGS_FLAG_NONE,
2764                               stream_initial_recv_window_size_);
2765   }
2766   SendSettings(settings_map);
2767
2768   // Next, notify the server about our initial recv window size.
2769   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
2770     // Bump up the receive window size to the real initial value. This
2771     // has to go here since the WINDOW_UPDATE frame sent by
2772     // IncreaseRecvWindowSize() call uses |buffered_spdy_framer_|.
2773     DCHECK_GT(kDefaultInitialRecvWindowSize, session_recv_window_size_);
2774     // This condition implies that |kDefaultInitialRecvWindowSize| -
2775     // |session_recv_window_size_| doesn't overflow.
2776     DCHECK_GT(session_recv_window_size_, 0);
2777     IncreaseRecvWindowSize(
2778         kDefaultInitialRecvWindowSize - session_recv_window_size_);
2779   }
2780
2781   if (protocol_ <= kProtoSPDY31) {
2782     // Finally, notify the server about the settings they have
2783     // previously told us to use when communicating with them (after
2784     // applying them).
2785     const SettingsMap& server_settings_map =
2786         http_server_properties_->GetSpdySettings(host_port_pair());
2787     if (server_settings_map.empty())
2788       return;
2789
2790     SettingsMap::const_iterator it =
2791         server_settings_map.find(SETTINGS_CURRENT_CWND);
2792     uint32 cwnd = (it != server_settings_map.end()) ? it->second.second : 0;
2793     UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", cwnd, 1, 200, 100);
2794
2795     for (SettingsMap::const_iterator it = server_settings_map.begin();
2796          it != server_settings_map.end(); ++it) {
2797       const SpdySettingsIds new_id = it->first;
2798       const uint32 new_val = it->second.second;
2799       HandleSetting(new_id, new_val);
2800     }
2801
2802     SendSettings(server_settings_map);
2803   }
2804 }
2805
2806
2807 void SpdySession::SendSettings(const SettingsMap& settings) {
2808   const SpdyMajorVersion protocol_version = GetProtocolVersion();
2809   net_log_.AddEvent(
2810       NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
2811       base::Bind(&NetLogSpdySendSettingsCallback, &settings, protocol_version));
2812   // Create the SETTINGS frame and send it.
2813   DCHECK(buffered_spdy_framer_.get());
2814   scoped_ptr<SpdyFrame> settings_frame(
2815       buffered_spdy_framer_->CreateSettings(settings));
2816   sent_settings_ = true;
2817   EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass());
2818 }
2819
2820 void SpdySession::HandleSetting(uint32 id, uint32 value) {
2821   switch (id) {
2822     case SETTINGS_MAX_CONCURRENT_STREAMS:
2823       max_concurrent_streams_ = std::min(static_cast<size_t>(value),
2824                                          kMaxConcurrentStreamLimit);
2825       ProcessPendingStreamRequests();
2826       break;
2827     case SETTINGS_INITIAL_WINDOW_SIZE: {
2828       if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2829         net_log().AddEvent(
2830             NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_NO_FLOW_CONTROL);
2831         return;
2832       }
2833
2834       if (value > static_cast<uint32>(kint32max)) {
2835         net_log().AddEvent(
2836             NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_OUT_OF_RANGE,
2837             NetLog::IntegerCallback("initial_window_size", value));
2838         return;
2839       }
2840
2841       // SETTINGS_INITIAL_WINDOW_SIZE updates initial_send_window_size_ only.
2842       int32 delta_window_size =
2843           static_cast<int32>(value) - stream_initial_send_window_size_;
2844       stream_initial_send_window_size_ = static_cast<int32>(value);
2845       UpdateStreamsSendWindowSize(delta_window_size);
2846       net_log().AddEvent(
2847           NetLog::TYPE_SPDY_SESSION_UPDATE_STREAMS_SEND_WINDOW_SIZE,
2848           NetLog::IntegerCallback("delta_window_size", delta_window_size));
2849       break;
2850     }
2851   }
2852 }
2853
2854 void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
2855   DCHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2856   for (ActiveStreamMap::iterator it = active_streams_.begin();
2857        it != active_streams_.end(); ++it) {
2858     it->second.stream->AdjustSendWindowSize(delta_window_size);
2859   }
2860
2861   for (CreatedStreamSet::const_iterator it = created_streams_.begin();
2862        it != created_streams_.end(); it++) {
2863     (*it)->AdjustSendWindowSize(delta_window_size);
2864   }
2865 }
2866
2867 void SpdySession::SendPrefacePingIfNoneInFlight() {
2868   if (pings_in_flight_ || !enable_ping_based_connection_checking_)
2869     return;
2870
2871   base::TimeTicks now = time_func_();
2872   // If there is no activity in the session, then send a preface-PING.
2873   if ((now - last_activity_time_) > connection_at_risk_of_loss_time_)
2874     SendPrefacePing();
2875 }
2876
2877 void SpdySession::SendPrefacePing() {
2878   WritePingFrame(next_ping_id_, false);
2879 }
2880
2881 void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id,
2882                                         uint32 delta_window_size,
2883                                         RequestPriority priority) {
2884   CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2885   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2886   if (it != active_streams_.end()) {
2887     CHECK_EQ(it->second.stream->stream_id(), stream_id);
2888   } else {
2889     CHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2890     CHECK_EQ(stream_id, kSessionFlowControlStreamId);
2891   }
2892
2893   net_log_.AddEvent(
2894       NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME,
2895       base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2896                  stream_id, delta_window_size));
2897
2898   DCHECK(buffered_spdy_framer_.get());
2899   scoped_ptr<SpdyFrame> window_update_frame(
2900       buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
2901   EnqueueSessionWrite(priority, WINDOW_UPDATE, window_update_frame.Pass());
2902 }
2903
2904 void SpdySession::WritePingFrame(uint32 unique_id, bool is_ack) {
2905   DCHECK(buffered_spdy_framer_.get());
2906   scoped_ptr<SpdyFrame> ping_frame(
2907       buffered_spdy_framer_->CreatePingFrame(unique_id, is_ack));
2908   EnqueueSessionWrite(HIGHEST, PING, ping_frame.Pass());
2909
2910   if (net_log().IsLogging()) {
2911     net_log().AddEvent(
2912         NetLog::TYPE_SPDY_SESSION_PING,
2913         base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "sent"));
2914   }
2915   if (!is_ack) {
2916     next_ping_id_ += 2;
2917     ++pings_in_flight_;
2918     PlanToCheckPingStatus();
2919     last_ping_sent_time_ = time_func_();
2920   }
2921 }
2922
2923 void SpdySession::PlanToCheckPingStatus() {
2924   if (check_ping_status_pending_)
2925     return;
2926
2927   check_ping_status_pending_ = true;
2928   base::MessageLoop::current()->PostDelayedTask(
2929       FROM_HERE,
2930       base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2931                  time_func_()), hung_interval_);
2932 }
2933
2934 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) {
2935   CHECK(!in_io_loop_);
2936
2937   // Check if we got a response back for all PINGs we had sent.
2938   if (pings_in_flight_ == 0) {
2939     check_ping_status_pending_ = false;
2940     return;
2941   }
2942
2943   DCHECK(check_ping_status_pending_);
2944
2945   base::TimeTicks now = time_func_();
2946   base::TimeDelta delay = hung_interval_ - (now - last_activity_time_);
2947
2948   if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) {
2949     // Track all failed PING messages in a separate bucket.
2950     RecordPingRTTHistogram(base::TimeDelta::Max());
2951     DoDrainSession(ERR_SPDY_PING_FAILED, "Failed ping.");
2952     return;
2953   }
2954
2955   // Check the status of connection after a delay.
2956   base::MessageLoop::current()->PostDelayedTask(
2957       FROM_HERE,
2958       base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2959                  now),
2960       delay);
2961 }
2962
2963 void SpdySession::RecordPingRTTHistogram(base::TimeDelta duration) {
2964   UMA_HISTOGRAM_TIMES("Net.SpdyPing.RTT", duration);
2965 }
2966
2967 void SpdySession::RecordProtocolErrorHistogram(
2968     SpdyProtocolErrorDetails details) {
2969   UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails2", details,
2970                             NUM_SPDY_PROTOCOL_ERROR_DETAILS);
2971   if (EndsWith(host_port_pair().host(), "google.com", false)) {
2972     UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails_Google2", details,
2973                               NUM_SPDY_PROTOCOL_ERROR_DETAILS);
2974   }
2975 }
2976
2977 void SpdySession::RecordHistograms() {
2978   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
2979                               streams_initiated_count_,
2980                               0, 300, 50);
2981   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
2982                               streams_pushed_count_,
2983                               0, 300, 50);
2984   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
2985                               streams_pushed_and_claimed_count_,
2986                               0, 300, 50);
2987   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
2988                               streams_abandoned_count_,
2989                               0, 300, 50);
2990   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
2991                             sent_settings_ ? 1 : 0, 2);
2992   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
2993                             received_settings_ ? 1 : 0, 2);
2994   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
2995                               stalled_streams_,
2996                               0, 300, 50);
2997   UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
2998                             stalled_streams_ > 0 ? 1 : 0, 2);
2999
3000   if (received_settings_) {
3001     // Enumerate the saved settings, and set histograms for it.
3002     const SettingsMap& settings_map =
3003         http_server_properties_->GetSpdySettings(host_port_pair());
3004
3005     SettingsMap::const_iterator it;
3006     for (it = settings_map.begin(); it != settings_map.end(); ++it) {
3007       const SpdySettingsIds id = it->first;
3008       const uint32 val = it->second.second;
3009       switch (id) {
3010         case SETTINGS_CURRENT_CWND:
3011           // Record several different histograms to see if cwnd converges
3012           // for larger volumes of data being sent.
3013           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
3014                                       val, 1, 200, 100);
3015           if (total_bytes_received_ > 10 * 1024) {
3016             UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
3017                                         val, 1, 200, 100);
3018             if (total_bytes_received_ > 25 * 1024) {
3019               UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
3020                                           val, 1, 200, 100);
3021               if (total_bytes_received_ > 50 * 1024) {
3022                 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
3023                                             val, 1, 200, 100);
3024                 if (total_bytes_received_ > 100 * 1024) {
3025                   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
3026                                               val, 1, 200, 100);
3027                 }
3028               }
3029             }
3030           }
3031           break;
3032         case SETTINGS_ROUND_TRIP_TIME:
3033           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
3034                                       val, 1, 1200, 100);
3035           break;
3036         case SETTINGS_DOWNLOAD_RETRANS_RATE:
3037           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
3038                                       val, 1, 100, 50);
3039           break;
3040         default:
3041           break;
3042       }
3043     }
3044   }
3045 }
3046
3047 void SpdySession::CompleteStreamRequest(
3048     const base::WeakPtr<SpdyStreamRequest>& pending_request) {
3049   // Abort if the request has already been cancelled.
3050   if (!pending_request)
3051     return;
3052
3053   base::WeakPtr<SpdyStream> stream;
3054   int rv = TryCreateStream(pending_request, &stream);
3055
3056   if (rv == OK) {
3057     DCHECK(stream);
3058     pending_request->OnRequestCompleteSuccess(stream);
3059     return;
3060   }
3061   DCHECK(!stream);
3062
3063   if (rv != ERR_IO_PENDING) {
3064     pending_request->OnRequestCompleteFailure(rv);
3065   }
3066 }
3067
3068 SSLClientSocket* SpdySession::GetSSLClientSocket() const {
3069   if (!is_secure_)
3070     return NULL;
3071   SSLClientSocket* ssl_socket =
3072       reinterpret_cast<SSLClientSocket*>(connection_->socket());
3073   DCHECK(ssl_socket);
3074   return ssl_socket;
3075 }
3076
3077 void SpdySession::OnWriteBufferConsumed(
3078     size_t frame_payload_size,
3079     size_t consume_size,
3080     SpdyBuffer::ConsumeSource consume_source) {
3081   // We can be called with |in_io_loop_| set if a write SpdyBuffer is
3082   // deleted (e.g., a stream is closed due to incoming data).
3083
3084   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3085
3086   if (consume_source == SpdyBuffer::DISCARD) {
3087     // If we're discarding a frame or part of it, increase the send
3088     // window by the number of discarded bytes. (Although if we're
3089     // discarding part of a frame, it's probably because of a write
3090     // error and we'll be tearing down the session soon.)
3091     size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
3092     DCHECK_GT(remaining_payload_bytes, 0u);
3093     IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
3094   }
3095   // For consumed bytes, the send window is increased when we receive
3096   // a WINDOW_UPDATE frame.
3097 }
3098
3099 void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
3100   // We can be called with |in_io_loop_| set if a SpdyBuffer is
3101   // deleted (e.g., a stream is closed due to incoming data).
3102
3103   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3104   DCHECK_GE(delta_window_size, 1);
3105
3106   // Check for overflow.
3107   int32 max_delta_window_size = kint32max - session_send_window_size_;
3108   if (delta_window_size > max_delta_window_size) {
3109     RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
3110     DoDrainSession(
3111         ERR_SPDY_PROTOCOL_ERROR,
3112         "Received WINDOW_UPDATE [delta: " +
3113             base::IntToString(delta_window_size) +
3114             "] for session overflows session_send_window_size_ [current: " +
3115             base::IntToString(session_send_window_size_) + "]");
3116     return;
3117   }
3118
3119   session_send_window_size_ += delta_window_size;
3120
3121   net_log_.AddEvent(
3122       NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
3123       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3124                  delta_window_size, session_send_window_size_));
3125
3126   DCHECK(!IsSendStalled());
3127   ResumeSendStalledStreams();
3128 }
3129
3130 void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) {
3131   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3132
3133   // We only call this method when sending a frame. Therefore,
3134   // |delta_window_size| should be within the valid frame size range.
3135   DCHECK_GE(delta_window_size, 1);
3136   DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
3137
3138   // |send_window_size_| should have been at least |delta_window_size| for
3139   // this call to happen.
3140   DCHECK_GE(session_send_window_size_, delta_window_size);
3141
3142   session_send_window_size_ -= delta_window_size;
3143
3144   net_log_.AddEvent(
3145       NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
3146       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3147                  -delta_window_size, session_send_window_size_));
3148 }
3149
3150 void SpdySession::OnReadBufferConsumed(
3151     size_t consume_size,
3152     SpdyBuffer::ConsumeSource consume_source) {
3153   // We can be called with |in_io_loop_| set if a read SpdyBuffer is
3154   // deleted (e.g., discarded by a SpdyReadQueue).
3155
3156   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3157   DCHECK_GE(consume_size, 1u);
3158   DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
3159
3160   IncreaseRecvWindowSize(static_cast<int32>(consume_size));
3161 }
3162
3163 void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) {
3164   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3165   DCHECK_GE(session_unacked_recv_window_bytes_, 0);
3166   DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_);
3167   DCHECK_GE(delta_window_size, 1);
3168   // Check for overflow.
3169   DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_);
3170
3171   session_recv_window_size_ += delta_window_size;
3172   net_log_.AddEvent(
3173       NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
3174       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3175                  delta_window_size, session_recv_window_size_));
3176
3177   session_unacked_recv_window_bytes_ += delta_window_size;
3178   if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) {
3179     SendWindowUpdateFrame(kSessionFlowControlStreamId,
3180                           session_unacked_recv_window_bytes_,
3181                           HIGHEST);
3182     session_unacked_recv_window_bytes_ = 0;
3183   }
3184 }
3185
3186 void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) {
3187   CHECK(in_io_loop_);
3188   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3189   DCHECK_GE(delta_window_size, 1);
3190
3191   // Since we never decrease the initial receive window size,
3192   // |delta_window_size| should never cause |recv_window_size_| to go
3193   // negative. If we do, the receive window isn't being respected.
3194   if (delta_window_size > session_recv_window_size_) {
3195     RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION);
3196     DoDrainSession(
3197         ERR_SPDY_FLOW_CONTROL_ERROR,
3198         "delta_window_size is " + base::IntToString(delta_window_size) +
3199             " in DecreaseRecvWindowSize, which is larger than the receive " +
3200             "window size of " + base::IntToString(session_recv_window_size_));
3201     return;
3202   }
3203
3204   session_recv_window_size_ -= delta_window_size;
3205   net_log_.AddEvent(
3206       NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW,
3207       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3208                  -delta_window_size, session_recv_window_size_));
3209 }
3210
3211 void SpdySession::QueueSendStalledStream(const SpdyStream& stream) {
3212   DCHECK(stream.send_stalled_by_flow_control());
3213   RequestPriority priority = stream.priority();
3214   CHECK_GE(priority, MINIMUM_PRIORITY);
3215   CHECK_LE(priority, MAXIMUM_PRIORITY);
3216   stream_send_unstall_queue_[priority].push_back(stream.stream_id());
3217 }
3218
3219 void SpdySession::ResumeSendStalledStreams() {
3220   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3221
3222   // We don't have to worry about new streams being queued, since
3223   // doing so would cause IsSendStalled() to return true. But we do
3224   // have to worry about streams being closed, as well as ourselves
3225   // being closed.
3226
3227   while (!IsSendStalled()) {
3228     size_t old_size = 0;
3229 #if DCHECK_IS_ON
3230     old_size = GetTotalSize(stream_send_unstall_queue_);
3231 #endif
3232
3233     SpdyStreamId stream_id = PopStreamToPossiblyResume();
3234     if (stream_id == 0)
3235       break;
3236     ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
3237     // The stream may actually still be send-stalled after this (due
3238     // to its own send window) but that's okay -- it'll then be
3239     // resumed once its send window increases.
3240     if (it != active_streams_.end())
3241       it->second.stream->PossiblyResumeIfSendStalled();
3242
3243     // The size should decrease unless we got send-stalled again.
3244     if (!IsSendStalled())
3245       DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size);
3246   }
3247 }
3248
3249 SpdyStreamId SpdySession::PopStreamToPossiblyResume() {
3250   for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) {
3251     std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i];
3252     if (!queue->empty()) {
3253       SpdyStreamId stream_id = queue->front();
3254       queue->pop_front();
3255       return stream_id;
3256     }
3257   }
3258   return 0;
3259 }
3260
3261 }  // namespace net