Upstream version 8.37.180.0
[platform/framework/web/crosswalk.git] / src / net / spdy / spdy_session.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/spdy_session.h"
6
7 #include <algorithm>
8 #include <map>
9
10 #include "base/basictypes.h"
11 #include "base/bind.h"
12 #include "base/compiler_specific.h"
13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/metrics/field_trial.h"
16 #include "base/metrics/histogram.h"
17 #include "base/metrics/sparse_histogram.h"
18 #include "base/metrics/stats_counters.h"
19 #include "base/stl_util.h"
20 #include "base/strings/string_number_conversions.h"
21 #include "base/strings/string_util.h"
22 #include "base/strings/stringprintf.h"
23 #include "base/strings/utf_string_conversions.h"
24 #include "base/time/time.h"
25 #include "base/values.h"
26 #include "crypto/ec_private_key.h"
27 #include "crypto/ec_signature_creator.h"
28 #include "net/base/connection_type_histograms.h"
29 #include "net/base/net_log.h"
30 #include "net/base/net_util.h"
31 #include "net/cert/asn1_util.h"
32 #include "net/http/http_log_util.h"
33 #include "net/http/http_network_session.h"
34 #include "net/http/http_server_properties.h"
35 #include "net/http/http_util.h"
36 #include "net/spdy/spdy_buffer_producer.h"
37 #include "net/spdy/spdy_frame_builder.h"
38 #include "net/spdy/spdy_http_utils.h"
39 #include "net/spdy/spdy_protocol.h"
40 #include "net/spdy/spdy_session_pool.h"
41 #include "net/spdy/spdy_stream.h"
42 #include "net/ssl/server_bound_cert_service.h"
43 #include "net/ssl/ssl_cipher_suite_names.h"
44 #include "net/ssl/ssl_connection_status_flags.h"
45
46 namespace net {
47
48 namespace {
49
50 const int kReadBufferSize = 8 * 1024;
51 const int kDefaultConnectionAtRiskOfLossSeconds = 10;
52 const int kHungIntervalSeconds = 10;
53
54 // Minimum seconds that unclaimed pushed streams will be kept in memory.
55 const int kMinPushedStreamLifetimeSeconds = 300;
56
57 scoped_ptr<base::ListValue> SpdyHeaderBlockToListValue(
58     const SpdyHeaderBlock& headers,
59     net::NetLog::LogLevel log_level) {
60   scoped_ptr<base::ListValue> headers_list(new base::ListValue());
61   for (SpdyHeaderBlock::const_iterator it = headers.begin();
62        it != headers.end(); ++it) {
63     headers_list->AppendString(
64         it->first + ": " +
65         ElideHeaderValueForNetLog(log_level, it->first, it->second));
66   }
67   return headers_list.Pass();
68 }
69
70 base::Value* NetLogSpdySynStreamSentCallback(const SpdyHeaderBlock* headers,
71                                              bool fin,
72                                              bool unidirectional,
73                                              SpdyPriority spdy_priority,
74                                              SpdyStreamId stream_id,
75                                              NetLog::LogLevel log_level) {
76   base::DictionaryValue* dict = new base::DictionaryValue();
77   dict->Set("headers",
78             SpdyHeaderBlockToListValue(*headers, log_level).release());
79   dict->SetBoolean("fin", fin);
80   dict->SetBoolean("unidirectional", unidirectional);
81   dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
82   dict->SetInteger("stream_id", stream_id);
83   return dict;
84 }
85
86 base::Value* NetLogSpdySynStreamReceivedCallback(
87     const SpdyHeaderBlock* headers,
88     bool fin,
89     bool unidirectional,
90     SpdyPriority spdy_priority,
91     SpdyStreamId stream_id,
92     SpdyStreamId associated_stream,
93     NetLog::LogLevel log_level) {
94   base::DictionaryValue* dict = new base::DictionaryValue();
95   dict->Set("headers",
96             SpdyHeaderBlockToListValue(*headers, log_level).release());
97   dict->SetBoolean("fin", fin);
98   dict->SetBoolean("unidirectional", unidirectional);
99   dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
100   dict->SetInteger("stream_id", stream_id);
101   dict->SetInteger("associated_stream", associated_stream);
102   return dict;
103 }
104
105 base::Value* NetLogSpdySynReplyOrHeadersReceivedCallback(
106     const SpdyHeaderBlock* headers,
107     bool fin,
108     SpdyStreamId stream_id,
109     NetLog::LogLevel log_level) {
110   base::DictionaryValue* dict = new base::DictionaryValue();
111   dict->Set("headers",
112             SpdyHeaderBlockToListValue(*headers, log_level).release());
113   dict->SetBoolean("fin", fin);
114   dict->SetInteger("stream_id", stream_id);
115   return dict;
116 }
117
118 base::Value* NetLogSpdySessionCloseCallback(int net_error,
119                                             const std::string* description,
120                                             NetLog::LogLevel /* log_level */) {
121   base::DictionaryValue* dict = new base::DictionaryValue();
122   dict->SetInteger("net_error", net_error);
123   dict->SetString("description", *description);
124   return dict;
125 }
126
127 base::Value* NetLogSpdySessionCallback(const HostPortProxyPair* host_pair,
128                                        NetLog::LogLevel /* log_level */) {
129   base::DictionaryValue* dict = new base::DictionaryValue();
130   dict->SetString("host", host_pair->first.ToString());
131   dict->SetString("proxy", host_pair->second.ToPacString());
132   return dict;
133 }
134
135 base::Value* NetLogSpdySettingsCallback(const HostPortPair& host_port_pair,
136                                         bool clear_persisted,
137                                         NetLog::LogLevel /* log_level */) {
138   base::DictionaryValue* dict = new base::DictionaryValue();
139   dict->SetString("host", host_port_pair.ToString());
140   dict->SetBoolean("clear_persisted", clear_persisted);
141   return dict;
142 }
143
144 base::Value* NetLogSpdySettingCallback(SpdySettingsIds id,
145                                        SpdySettingsFlags flags,
146                                        uint32 value,
147                                        NetLog::LogLevel /* log_level */) {
148   base::DictionaryValue* dict = new base::DictionaryValue();
149   dict->SetInteger("id", id);
150   dict->SetInteger("flags", flags);
151   dict->SetInteger("value", value);
152   return dict;
153 }
154
155 base::Value* NetLogSpdySendSettingsCallback(const SettingsMap* settings,
156                                             NetLog::LogLevel /* log_level */) {
157   base::DictionaryValue* dict = new base::DictionaryValue();
158   base::ListValue* settings_list = new base::ListValue();
159   for (SettingsMap::const_iterator it = settings->begin();
160        it != settings->end(); ++it) {
161     const SpdySettingsIds id = it->first;
162     const SpdySettingsFlags flags = it->second.first;
163     const uint32 value = it->second.second;
164     settings_list->Append(new base::StringValue(
165         base::StringPrintf("[id:%u flags:%u value:%u]", id, flags, value)));
166   }
167   dict->Set("settings", settings_list);
168   return dict;
169 }
170
171 base::Value* NetLogSpdyWindowUpdateFrameCallback(
172     SpdyStreamId stream_id,
173     uint32 delta,
174     NetLog::LogLevel /* log_level */) {
175   base::DictionaryValue* dict = new base::DictionaryValue();
176   dict->SetInteger("stream_id", static_cast<int>(stream_id));
177   dict->SetInteger("delta", delta);
178   return dict;
179 }
180
181 base::Value* NetLogSpdySessionWindowUpdateCallback(
182     int32 delta,
183     int32 window_size,
184     NetLog::LogLevel /* log_level */) {
185   base::DictionaryValue* dict = new base::DictionaryValue();
186   dict->SetInteger("delta", delta);
187   dict->SetInteger("window_size", window_size);
188   return dict;
189 }
190
191 base::Value* NetLogSpdyDataCallback(SpdyStreamId stream_id,
192                                     int size,
193                                     bool fin,
194                                     NetLog::LogLevel /* log_level */) {
195   base::DictionaryValue* dict = new base::DictionaryValue();
196   dict->SetInteger("stream_id", static_cast<int>(stream_id));
197   dict->SetInteger("size", size);
198   dict->SetBoolean("fin", fin);
199   return dict;
200 }
201
202 base::Value* NetLogSpdyRstCallback(SpdyStreamId stream_id,
203                                    int status,
204                                    const std::string* description,
205                                    NetLog::LogLevel /* log_level */) {
206   base::DictionaryValue* dict = new base::DictionaryValue();
207   dict->SetInteger("stream_id", static_cast<int>(stream_id));
208   dict->SetInteger("status", status);
209   dict->SetString("description", *description);
210   return dict;
211 }
212
213 base::Value* NetLogSpdyPingCallback(SpdyPingId unique_id,
214                                     bool is_ack,
215                                     const char* type,
216                                     NetLog::LogLevel /* log_level */) {
217   base::DictionaryValue* dict = new base::DictionaryValue();
218   dict->SetInteger("unique_id", unique_id);
219   dict->SetString("type", type);
220   dict->SetBoolean("is_ack", is_ack);
221   return dict;
222 }
223
224 base::Value* NetLogSpdyGoAwayCallback(SpdyStreamId last_stream_id,
225                                       int active_streams,
226                                       int unclaimed_streams,
227                                       SpdyGoAwayStatus status,
228                                       NetLog::LogLevel /* log_level */) {
229   base::DictionaryValue* dict = new base::DictionaryValue();
230   dict->SetInteger("last_accepted_stream_id",
231                    static_cast<int>(last_stream_id));
232   dict->SetInteger("active_streams", active_streams);
233   dict->SetInteger("unclaimed_streams", unclaimed_streams);
234   dict->SetInteger("status", static_cast<int>(status));
235   return dict;
236 }
237
238 base::Value* NetLogSpdyPushPromiseReceivedCallback(
239     const SpdyHeaderBlock* headers,
240     SpdyStreamId stream_id,
241     SpdyStreamId promised_stream_id,
242     NetLog::LogLevel log_level) {
243   base::DictionaryValue* dict = new base::DictionaryValue();
244   dict->Set("headers",
245             SpdyHeaderBlockToListValue(*headers, log_level).release());
246   dict->SetInteger("id", stream_id);
247   dict->SetInteger("promised_stream_id", promised_stream_id);
248   return dict;
249 }
250
251 // Helper function to return the total size of an array of objects
252 // with .size() member functions.
253 template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) {
254   size_t total_size = 0;
255   for (size_t i = 0; i < N; ++i) {
256     total_size += arr[i].size();
257   }
258   return total_size;
259 }
260
261 // Helper class for std:find_if on STL container containing
262 // SpdyStreamRequest weak pointers.
263 class RequestEquals {
264  public:
265   RequestEquals(const base::WeakPtr<SpdyStreamRequest>& request)
266       : request_(request) {}
267
268   bool operator()(const base::WeakPtr<SpdyStreamRequest>& request) const {
269     return request_.get() == request.get();
270   }
271
272  private:
273   const base::WeakPtr<SpdyStreamRequest> request_;
274 };
275
276 // The maximum number of concurrent streams we will ever create.  Even if
277 // the server permits more, we will never exceed this limit.
278 const size_t kMaxConcurrentStreamLimit = 256;
279
280 }  // namespace
281
282 SpdyProtocolErrorDetails MapFramerErrorToProtocolError(
283     SpdyFramer::SpdyError err) {
284   switch(err) {
285     case SpdyFramer::SPDY_NO_ERROR:
286       return SPDY_ERROR_NO_ERROR;
287     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME:
288       return SPDY_ERROR_INVALID_CONTROL_FRAME;
289     case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
290       return SPDY_ERROR_CONTROL_PAYLOAD_TOO_LARGE;
291     case SpdyFramer::SPDY_ZLIB_INIT_FAILURE:
292       return SPDY_ERROR_ZLIB_INIT_FAILURE;
293     case SpdyFramer::SPDY_UNSUPPORTED_VERSION:
294       return SPDY_ERROR_UNSUPPORTED_VERSION;
295     case SpdyFramer::SPDY_DECOMPRESS_FAILURE:
296       return SPDY_ERROR_DECOMPRESS_FAILURE;
297     case SpdyFramer::SPDY_COMPRESS_FAILURE:
298       return SPDY_ERROR_COMPRESS_FAILURE;
299     case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT:
300       return SPDY_ERROR_GOAWAY_FRAME_CORRUPT;
301     case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT:
302       return SPDY_ERROR_RST_STREAM_FRAME_CORRUPT;
303     case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS:
304       return SPDY_ERROR_INVALID_DATA_FRAME_FLAGS;
305     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS:
306       return SPDY_ERROR_INVALID_CONTROL_FRAME_FLAGS;
307     case SpdyFramer::SPDY_UNEXPECTED_FRAME:
308       return SPDY_ERROR_UNEXPECTED_FRAME;
309     default:
310       NOTREACHED();
311       return static_cast<SpdyProtocolErrorDetails>(-1);
312   }
313 }
314
315 Error MapFramerErrorToNetError(SpdyFramer::SpdyError err) {
316   switch (err) {
317     case SpdyFramer::SPDY_NO_ERROR:
318       return OK;
319     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME:
320       return ERR_SPDY_PROTOCOL_ERROR;
321     case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
322       return ERR_SPDY_FRAME_SIZE_ERROR;
323     case SpdyFramer::SPDY_ZLIB_INIT_FAILURE:
324       return ERR_SPDY_COMPRESSION_ERROR;
325     case SpdyFramer::SPDY_UNSUPPORTED_VERSION:
326       return ERR_SPDY_PROTOCOL_ERROR;
327     case SpdyFramer::SPDY_DECOMPRESS_FAILURE:
328       return ERR_SPDY_COMPRESSION_ERROR;
329     case SpdyFramer::SPDY_COMPRESS_FAILURE:
330       return ERR_SPDY_COMPRESSION_ERROR;
331     case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT:
332       return ERR_SPDY_PROTOCOL_ERROR;
333     case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT:
334       return ERR_SPDY_PROTOCOL_ERROR;
335     case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS:
336       return ERR_SPDY_PROTOCOL_ERROR;
337     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS:
338       return ERR_SPDY_PROTOCOL_ERROR;
339     case SpdyFramer::SPDY_UNEXPECTED_FRAME:
340       return ERR_SPDY_PROTOCOL_ERROR;
341     default:
342       NOTREACHED();
343       return ERR_SPDY_PROTOCOL_ERROR;
344   }
345 }
346
347 SpdyProtocolErrorDetails MapRstStreamStatusToProtocolError(
348     SpdyRstStreamStatus status) {
349   switch(status) {
350     case RST_STREAM_PROTOCOL_ERROR:
351       return STATUS_CODE_PROTOCOL_ERROR;
352     case RST_STREAM_INVALID_STREAM:
353       return STATUS_CODE_INVALID_STREAM;
354     case RST_STREAM_REFUSED_STREAM:
355       return STATUS_CODE_REFUSED_STREAM;
356     case RST_STREAM_UNSUPPORTED_VERSION:
357       return STATUS_CODE_UNSUPPORTED_VERSION;
358     case RST_STREAM_CANCEL:
359       return STATUS_CODE_CANCEL;
360     case RST_STREAM_INTERNAL_ERROR:
361       return STATUS_CODE_INTERNAL_ERROR;
362     case RST_STREAM_FLOW_CONTROL_ERROR:
363       return STATUS_CODE_FLOW_CONTROL_ERROR;
364     case RST_STREAM_STREAM_IN_USE:
365       return STATUS_CODE_STREAM_IN_USE;
366     case RST_STREAM_STREAM_ALREADY_CLOSED:
367       return STATUS_CODE_STREAM_ALREADY_CLOSED;
368     case RST_STREAM_INVALID_CREDENTIALS:
369       return STATUS_CODE_INVALID_CREDENTIALS;
370     case RST_STREAM_FRAME_SIZE_ERROR:
371       return STATUS_CODE_FRAME_SIZE_ERROR;
372     case RST_STREAM_SETTINGS_TIMEOUT:
373       return STATUS_CODE_SETTINGS_TIMEOUT;
374     case RST_STREAM_CONNECT_ERROR:
375       return STATUS_CODE_CONNECT_ERROR;
376     case RST_STREAM_ENHANCE_YOUR_CALM:
377       return STATUS_CODE_ENHANCE_YOUR_CALM;
378     default:
379       NOTREACHED();
380       return static_cast<SpdyProtocolErrorDetails>(-1);
381   }
382 }
383
384 SpdyGoAwayStatus MapNetErrorToGoAwayStatus(Error err) {
385   switch (err) {
386     case OK:
387       return GOAWAY_NO_ERROR;
388     case ERR_SPDY_PROTOCOL_ERROR:
389       return GOAWAY_PROTOCOL_ERROR;
390     case ERR_SPDY_FLOW_CONTROL_ERROR:
391       return GOAWAY_FLOW_CONTROL_ERROR;
392     case ERR_SPDY_FRAME_SIZE_ERROR:
393       return GOAWAY_FRAME_SIZE_ERROR;
394     case ERR_SPDY_COMPRESSION_ERROR:
395       return GOAWAY_COMPRESSION_ERROR;
396     case ERR_SPDY_INADEQUATE_TRANSPORT_SECURITY:
397       return GOAWAY_INADEQUATE_SECURITY;
398     default:
399       return GOAWAY_PROTOCOL_ERROR;
400   }
401 }
402
403 void SplitPushedHeadersToRequestAndResponse(const SpdyHeaderBlock& headers,
404                                             SpdyMajorVersion protocol_version,
405                                             SpdyHeaderBlock* request_headers,
406                                             SpdyHeaderBlock* response_headers) {
407   DCHECK(response_headers);
408   DCHECK(request_headers);
409   for (SpdyHeaderBlock::const_iterator it = headers.begin();
410        it != headers.end();
411        ++it) {
412     SpdyHeaderBlock* to_insert = response_headers;
413     if (protocol_version == SPDY2) {
414       if (it->first == "url")
415         to_insert = request_headers;
416     } else {
417       const char* host = protocol_version >= SPDY4 ? ":authority" : ":host";
418       static const char* scheme = ":scheme";
419       static const char* path = ":path";
420       if (it->first == host || it->first == scheme || it->first == path)
421         to_insert = request_headers;
422     }
423     to_insert->insert(*it);
424   }
425 }
426
427 SpdyStreamRequest::SpdyStreamRequest() : weak_ptr_factory_(this) {
428   Reset();
429 }
430
431 SpdyStreamRequest::~SpdyStreamRequest() {
432   CancelRequest();
433 }
434
435 int SpdyStreamRequest::StartRequest(
436     SpdyStreamType type,
437     const base::WeakPtr<SpdySession>& session,
438     const GURL& url,
439     RequestPriority priority,
440     const BoundNetLog& net_log,
441     const CompletionCallback& callback) {
442   DCHECK(session);
443   DCHECK(!session_);
444   DCHECK(!stream_);
445   DCHECK(callback_.is_null());
446
447   type_ = type;
448   session_ = session;
449   url_ = url;
450   priority_ = priority;
451   net_log_ = net_log;
452   callback_ = callback;
453
454   base::WeakPtr<SpdyStream> stream;
455   int rv = session->TryCreateStream(weak_ptr_factory_.GetWeakPtr(), &stream);
456   if (rv == OK) {
457     Reset();
458     stream_ = stream;
459   }
460   return rv;
461 }
462
463 void SpdyStreamRequest::CancelRequest() {
464   if (session_)
465     session_->CancelStreamRequest(weak_ptr_factory_.GetWeakPtr());
466   Reset();
467   // Do this to cancel any pending CompleteStreamRequest() tasks.
468   weak_ptr_factory_.InvalidateWeakPtrs();
469 }
470
471 base::WeakPtr<SpdyStream> SpdyStreamRequest::ReleaseStream() {
472   DCHECK(!session_);
473   base::WeakPtr<SpdyStream> stream = stream_;
474   DCHECK(stream);
475   Reset();
476   return stream;
477 }
478
479 void SpdyStreamRequest::OnRequestCompleteSuccess(
480     const base::WeakPtr<SpdyStream>& stream) {
481   DCHECK(session_);
482   DCHECK(!stream_);
483   DCHECK(!callback_.is_null());
484   CompletionCallback callback = callback_;
485   Reset();
486   DCHECK(stream);
487   stream_ = stream;
488   callback.Run(OK);
489 }
490
491 void SpdyStreamRequest::OnRequestCompleteFailure(int rv) {
492   DCHECK(session_);
493   DCHECK(!stream_);
494   DCHECK(!callback_.is_null());
495   CompletionCallback callback = callback_;
496   Reset();
497   DCHECK_NE(rv, OK);
498   callback.Run(rv);
499 }
500
501 void SpdyStreamRequest::Reset() {
502   type_ = SPDY_BIDIRECTIONAL_STREAM;
503   session_.reset();
504   stream_.reset();
505   url_ = GURL();
506   priority_ = MINIMUM_PRIORITY;
507   net_log_ = BoundNetLog();
508   callback_.Reset();
509 }
510
511 SpdySession::ActiveStreamInfo::ActiveStreamInfo()
512     : stream(NULL),
513       waiting_for_syn_reply(false) {}
514
515 SpdySession::ActiveStreamInfo::ActiveStreamInfo(SpdyStream* stream)
516     : stream(stream),
517       waiting_for_syn_reply(stream->type() != SPDY_PUSH_STREAM) {
518 }
519
520 SpdySession::ActiveStreamInfo::~ActiveStreamInfo() {}
521
522 SpdySession::PushedStreamInfo::PushedStreamInfo() : stream_id(0) {}
523
524 SpdySession::PushedStreamInfo::PushedStreamInfo(
525     SpdyStreamId stream_id,
526     base::TimeTicks creation_time)
527     : stream_id(stream_id),
528       creation_time(creation_time) {}
529
530 SpdySession::PushedStreamInfo::~PushedStreamInfo() {}
531
532 SpdySession::SpdySession(
533     const SpdySessionKey& spdy_session_key,
534     const base::WeakPtr<HttpServerProperties>& http_server_properties,
535     bool verify_domain_authentication,
536     bool enable_sending_initial_data,
537     bool enable_compression,
538     bool enable_ping_based_connection_checking,
539     NextProto default_protocol,
540     size_t stream_initial_recv_window_size,
541     size_t initial_max_concurrent_streams,
542     size_t max_concurrent_streams_limit,
543     TimeFunc time_func,
544     const HostPortPair& trusted_spdy_proxy,
545     NetLog* net_log)
546     : in_io_loop_(false),
547       spdy_session_key_(spdy_session_key),
548       pool_(NULL),
549       http_server_properties_(http_server_properties),
550       read_buffer_(new IOBuffer(kReadBufferSize)),
551       stream_hi_water_mark_(kFirstStreamId),
552       in_flight_write_frame_type_(DATA),
553       in_flight_write_frame_size_(0),
554       is_secure_(false),
555       certificate_error_code_(OK),
556       availability_state_(STATE_AVAILABLE),
557       read_state_(READ_STATE_DO_READ),
558       write_state_(WRITE_STATE_IDLE),
559       error_on_close_(OK),
560       max_concurrent_streams_(initial_max_concurrent_streams == 0
561                                   ? kInitialMaxConcurrentStreams
562                                   : initial_max_concurrent_streams),
563       max_concurrent_streams_limit_(max_concurrent_streams_limit == 0
564                                         ? kMaxConcurrentStreamLimit
565                                         : max_concurrent_streams_limit),
566       streams_initiated_count_(0),
567       streams_pushed_count_(0),
568       streams_pushed_and_claimed_count_(0),
569       streams_abandoned_count_(0),
570       total_bytes_received_(0),
571       sent_settings_(false),
572       received_settings_(false),
573       stalled_streams_(0),
574       pings_in_flight_(0),
575       next_ping_id_(1),
576       last_activity_time_(time_func()),
577       last_compressed_frame_len_(0),
578       check_ping_status_pending_(false),
579       send_connection_header_prefix_(false),
580       flow_control_state_(FLOW_CONTROL_NONE),
581       stream_initial_send_window_size_(kSpdyStreamInitialWindowSize),
582       stream_initial_recv_window_size_(stream_initial_recv_window_size == 0
583                                            ? kDefaultInitialRecvWindowSize
584                                            : stream_initial_recv_window_size),
585       session_send_window_size_(0),
586       session_recv_window_size_(0),
587       session_unacked_recv_window_bytes_(0),
588       net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)),
589       verify_domain_authentication_(verify_domain_authentication),
590       enable_sending_initial_data_(enable_sending_initial_data),
591       enable_compression_(enable_compression),
592       enable_ping_based_connection_checking_(
593           enable_ping_based_connection_checking),
594       protocol_(default_protocol),
595       connection_at_risk_of_loss_time_(
596           base::TimeDelta::FromSeconds(kDefaultConnectionAtRiskOfLossSeconds)),
597       hung_interval_(base::TimeDelta::FromSeconds(kHungIntervalSeconds)),
598       trusted_spdy_proxy_(trusted_spdy_proxy),
599       time_func_(time_func),
600       weak_factory_(this) {
601   DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
602   DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
603   DCHECK(HttpStreamFactory::spdy_enabled());
604   net_log_.BeginEvent(
605       NetLog::TYPE_SPDY_SESSION,
606       base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair()));
607   next_unclaimed_push_stream_sweep_time_ = time_func_() +
608       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
609   // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
610 }
611
612 SpdySession::~SpdySession() {
613   CHECK(!in_io_loop_);
614   DcheckDraining();
615
616   // TODO(akalin): Check connection->is_initialized() instead. This
617   // requires re-working CreateFakeSpdySession(), though.
618   DCHECK(connection_->socket());
619   // With SPDY we can't recycle sockets.
620   connection_->socket()->Disconnect();
621
622   RecordHistograms();
623
624   net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION);
625 }
626
627 void SpdySession::InitializeWithSocket(
628     scoped_ptr<ClientSocketHandle> connection,
629     SpdySessionPool* pool,
630     bool is_secure,
631     int certificate_error_code) {
632   CHECK(!in_io_loop_);
633   DCHECK_EQ(availability_state_, STATE_AVAILABLE);
634   DCHECK_EQ(read_state_, READ_STATE_DO_READ);
635   DCHECK_EQ(write_state_, WRITE_STATE_IDLE);
636   DCHECK(!connection_);
637
638   DCHECK(certificate_error_code == OK ||
639          certificate_error_code < ERR_IO_PENDING);
640   // TODO(akalin): Check connection->is_initialized() instead. This
641   // requires re-working CreateFakeSpdySession(), though.
642   DCHECK(connection->socket());
643
644   base::StatsCounter spdy_sessions("spdy.sessions");
645   spdy_sessions.Increment();
646
647   connection_ = connection.Pass();
648   is_secure_ = is_secure;
649   certificate_error_code_ = certificate_error_code;
650
651   NextProto protocol_negotiated =
652       connection_->socket()->GetNegotiatedProtocol();
653   if (protocol_negotiated != kProtoUnknown) {
654     protocol_ = protocol_negotiated;
655   }
656   DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
657   DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
658
659   if (protocol_ == kProtoSPDY4)
660     send_connection_header_prefix_ = true;
661
662   if (protocol_ >= kProtoSPDY31) {
663     flow_control_state_ = FLOW_CONTROL_STREAM_AND_SESSION;
664     session_send_window_size_ = kSpdySessionInitialWindowSize;
665     session_recv_window_size_ = kSpdySessionInitialWindowSize;
666   } else if (protocol_ >= kProtoSPDY3) {
667     flow_control_state_ = FLOW_CONTROL_STREAM;
668   } else {
669     flow_control_state_ = FLOW_CONTROL_NONE;
670   }
671
672   buffered_spdy_framer_.reset(
673       new BufferedSpdyFramer(NextProtoToSpdyMajorVersion(protocol_),
674                              enable_compression_));
675   buffered_spdy_framer_->set_visitor(this);
676   buffered_spdy_framer_->set_debug_visitor(this);
677   UMA_HISTOGRAM_ENUMERATION("Net.SpdyVersion", protocol_, kProtoMaximumVersion);
678 #if defined(SPDY_PROXY_AUTH_ORIGIN)
679   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessions_DataReductionProxy",
680                         host_port_pair().Equals(HostPortPair::FromURL(
681                             GURL(SPDY_PROXY_AUTH_ORIGIN))));
682 #endif
683
684   net_log_.AddEvent(
685       NetLog::TYPE_SPDY_SESSION_INITIALIZED,
686       connection_->socket()->NetLog().source().ToEventParametersCallback());
687
688   DCHECK_EQ(availability_state_, STATE_AVAILABLE);
689   connection_->AddHigherLayeredPool(this);
690   if (enable_sending_initial_data_)
691     SendInitialData();
692   pool_ = pool;
693
694   // Bootstrap the read loop.
695   base::MessageLoop::current()->PostTask(
696       FROM_HERE,
697       base::Bind(&SpdySession::PumpReadLoop,
698                  weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
699 }
700
701 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
702   if (!verify_domain_authentication_)
703     return true;
704
705   if (availability_state_ == STATE_DRAINING)
706     return false;
707
708   SSLInfo ssl_info;
709   bool was_npn_negotiated;
710   NextProto protocol_negotiated = kProtoUnknown;
711   if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated))
712     return true;   // This is not a secure session, so all domains are okay.
713
714   bool unused = false;
715   return
716       !ssl_info.client_cert_sent &&
717       (!ssl_info.channel_id_sent ||
718        (ServerBoundCertService::GetDomainForHost(domain) ==
719         ServerBoundCertService::GetDomainForHost(host_port_pair().host()))) &&
720       ssl_info.cert->VerifyNameMatch(domain, &unused);
721 }
722
723 int SpdySession::GetPushStream(
724     const GURL& url,
725     base::WeakPtr<SpdyStream>* stream,
726     const BoundNetLog& stream_net_log) {
727   CHECK(!in_io_loop_);
728
729   stream->reset();
730
731   if (availability_state_ == STATE_DRAINING)
732     return ERR_CONNECTION_CLOSED;
733
734   Error err = TryAccessStream(url);
735   if (err != OK)
736     return err;
737
738   *stream = GetActivePushStream(url);
739   if (*stream) {
740     DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_);
741     streams_pushed_and_claimed_count_++;
742   }
743   return OK;
744 }
745
746 // {,Try}CreateStream() and TryAccessStream() can be called with
747 // |in_io_loop_| set if a stream is being created in response to
748 // another being closed due to received data.
749
750 Error SpdySession::TryAccessStream(const GURL& url) {
751   if (is_secure_ && certificate_error_code_ != OK &&
752       (url.SchemeIs("https") || url.SchemeIs("wss"))) {
753     RecordProtocolErrorHistogram(
754         PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION);
755     DoDrainSession(
756         static_cast<Error>(certificate_error_code_),
757         "Tried to get SPDY stream for secure content over an unauthenticated "
758         "session.");
759     return ERR_SPDY_PROTOCOL_ERROR;
760   }
761   return OK;
762 }
763
764 int SpdySession::TryCreateStream(
765     const base::WeakPtr<SpdyStreamRequest>& request,
766     base::WeakPtr<SpdyStream>* stream) {
767   DCHECK(request);
768
769   if (availability_state_ == STATE_GOING_AWAY)
770     return ERR_FAILED;
771
772   if (availability_state_ == STATE_DRAINING)
773     return ERR_CONNECTION_CLOSED;
774
775   Error err = TryAccessStream(request->url());
776   if (err != OK)
777     return err;
778
779   if (!max_concurrent_streams_ ||
780       (active_streams_.size() + created_streams_.size() <
781        max_concurrent_streams_)) {
782     return CreateStream(*request, stream);
783   }
784
785   stalled_streams_++;
786   net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS);
787   RequestPriority priority = request->priority();
788   CHECK_GE(priority, MINIMUM_PRIORITY);
789   CHECK_LE(priority, MAXIMUM_PRIORITY);
790   pending_create_stream_queues_[priority].push_back(request);
791   return ERR_IO_PENDING;
792 }
793
794 int SpdySession::CreateStream(const SpdyStreamRequest& request,
795                               base::WeakPtr<SpdyStream>* stream) {
796   DCHECK_GE(request.priority(), MINIMUM_PRIORITY);
797   DCHECK_LE(request.priority(), MAXIMUM_PRIORITY);
798
799   if (availability_state_ == STATE_GOING_AWAY)
800     return ERR_FAILED;
801
802   if (availability_state_ == STATE_DRAINING)
803     return ERR_CONNECTION_CLOSED;
804
805   Error err = TryAccessStream(request.url());
806   if (err != OK) {
807     // This should have been caught in TryCreateStream().
808     NOTREACHED();
809     return err;
810   }
811
812   DCHECK(connection_->socket());
813   DCHECK(connection_->socket()->IsConnected());
814   if (connection_->socket()) {
815     UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected",
816                           connection_->socket()->IsConnected());
817     if (!connection_->socket()->IsConnected()) {
818       DoDrainSession(
819           ERR_CONNECTION_CLOSED,
820           "Tried to create SPDY stream for a closed socket connection.");
821       return ERR_CONNECTION_CLOSED;
822     }
823   }
824
825   scoped_ptr<SpdyStream> new_stream(
826       new SpdyStream(request.type(), GetWeakPtr(), request.url(),
827                      request.priority(),
828                      stream_initial_send_window_size_,
829                      stream_initial_recv_window_size_,
830                      request.net_log()));
831   *stream = new_stream->GetWeakPtr();
832   InsertCreatedStream(new_stream.Pass());
833
834   UMA_HISTOGRAM_CUSTOM_COUNTS(
835       "Net.SpdyPriorityCount",
836       static_cast<int>(request.priority()), 0, 10, 11);
837
838   return OK;
839 }
840
841 void SpdySession::CancelStreamRequest(
842     const base::WeakPtr<SpdyStreamRequest>& request) {
843   DCHECK(request);
844   RequestPriority priority = request->priority();
845   CHECK_GE(priority, MINIMUM_PRIORITY);
846   CHECK_LE(priority, MAXIMUM_PRIORITY);
847
848 #if DCHECK_IS_ON
849   // |request| should not be in a queue not matching its priority.
850   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
851     if (priority == i)
852       continue;
853     PendingStreamRequestQueue* queue = &pending_create_stream_queues_[i];
854     DCHECK(std::find_if(queue->begin(),
855                         queue->end(),
856                         RequestEquals(request)) == queue->end());
857   }
858 #endif
859
860   PendingStreamRequestQueue* queue =
861       &pending_create_stream_queues_[priority];
862   // Remove |request| from |queue| while preserving the order of the
863   // other elements.
864   PendingStreamRequestQueue::iterator it =
865       std::find_if(queue->begin(), queue->end(), RequestEquals(request));
866   // The request may already be removed if there's a
867   // CompleteStreamRequest() in flight.
868   if (it != queue->end()) {
869     it = queue->erase(it);
870     // |request| should be in the queue at most once, and if it is
871     // present, should not be pending completion.
872     DCHECK(std::find_if(it, queue->end(), RequestEquals(request)) ==
873            queue->end());
874   }
875 }
876
877 base::WeakPtr<SpdyStreamRequest> SpdySession::GetNextPendingStreamRequest() {
878   for (int j = MAXIMUM_PRIORITY; j >= MINIMUM_PRIORITY; --j) {
879     if (pending_create_stream_queues_[j].empty())
880       continue;
881
882     base::WeakPtr<SpdyStreamRequest> pending_request =
883         pending_create_stream_queues_[j].front();
884     DCHECK(pending_request);
885     pending_create_stream_queues_[j].pop_front();
886     return pending_request;
887   }
888   return base::WeakPtr<SpdyStreamRequest>();
889 }
890
891 void SpdySession::ProcessPendingStreamRequests() {
892   // Like |max_concurrent_streams_|, 0 means infinite for
893   // |max_requests_to_process|.
894   size_t max_requests_to_process = 0;
895   if (max_concurrent_streams_ != 0) {
896     max_requests_to_process =
897         max_concurrent_streams_ -
898         (active_streams_.size() + created_streams_.size());
899   }
900   for (size_t i = 0;
901        max_requests_to_process == 0 || i < max_requests_to_process; ++i) {
902     base::WeakPtr<SpdyStreamRequest> pending_request =
903         GetNextPendingStreamRequest();
904     if (!pending_request)
905       break;
906
907     // Note that this post can race with other stream creations, and it's
908     // possible that the un-stalled stream will be stalled again if it loses.
909     // TODO(jgraettinger): Provide stronger ordering guarantees.
910     base::MessageLoop::current()->PostTask(
911         FROM_HERE,
912         base::Bind(&SpdySession::CompleteStreamRequest,
913                    weak_factory_.GetWeakPtr(),
914                    pending_request));
915   }
916 }
917
918 void SpdySession::AddPooledAlias(const SpdySessionKey& alias_key) {
919   pooled_aliases_.insert(alias_key);
920 }
921
922 SpdyMajorVersion SpdySession::GetProtocolVersion() const {
923   DCHECK(buffered_spdy_framer_.get());
924   return buffered_spdy_framer_->protocol_version();
925 }
926
927 bool SpdySession::HasAcceptableTransportSecurity() const {
928   // If we're not even using TLS, we have no standards to meet.
929   if (!is_secure_) {
930     return true;
931   }
932
933   // We don't enforce transport security standards for older SPDY versions.
934   if (GetProtocolVersion() < SPDY4) {
935     return true;
936   }
937
938   SSLInfo ssl_info;
939   CHECK(connection_->socket()->GetSSLInfo(&ssl_info));
940
941   // HTTP/2 requires TLS 1.2+
942   if (SSLConnectionStatusToVersion(ssl_info.connection_status) <
943       SSL_CONNECTION_VERSION_TLS1_2) {
944     return false;
945   }
946
947   if (!IsSecureTLSCipherSuite(
948           SSLConnectionStatusToCipherSuite(ssl_info.connection_status))) {
949     return false;
950   }
951
952   return true;
953 }
954
955 base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() {
956   return weak_factory_.GetWeakPtr();
957 }
958
959 bool SpdySession::CloseOneIdleConnection() {
960   CHECK(!in_io_loop_);
961   DCHECK(pool_);
962   if (active_streams_.empty()) {
963     DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
964   }
965   // Return false as the socket wasn't immediately closed.
966   return false;
967 }
968
969 void SpdySession::EnqueueStreamWrite(
970     const base::WeakPtr<SpdyStream>& stream,
971     SpdyFrameType frame_type,
972     scoped_ptr<SpdyBufferProducer> producer) {
973   DCHECK(frame_type == HEADERS ||
974          frame_type == DATA ||
975          frame_type == CREDENTIAL ||
976          frame_type == SYN_STREAM);
977   EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream);
978 }
979
980 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream(
981     SpdyStreamId stream_id,
982     RequestPriority priority,
983     SpdyControlFlags flags,
984     const SpdyHeaderBlock& headers) {
985   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
986   CHECK(it != active_streams_.end());
987   CHECK_EQ(it->second.stream->stream_id(), stream_id);
988
989   SendPrefacePingIfNoneInFlight();
990
991   DCHECK(buffered_spdy_framer_.get());
992   SpdyPriority spdy_priority =
993       ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion());
994   scoped_ptr<SpdyFrame> syn_frame(
995       buffered_spdy_framer_->CreateSynStream(stream_id, 0, spdy_priority, flags,
996                                              &headers));
997
998   base::StatsCounter spdy_requests("spdy.requests");
999   spdy_requests.Increment();
1000   streams_initiated_count_++;
1001
1002   if (net_log().IsLogging()) {
1003     net_log().AddEvent(
1004         NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
1005         base::Bind(&NetLogSpdySynStreamSentCallback, &headers,
1006                    (flags & CONTROL_FLAG_FIN) != 0,
1007                    (flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0,
1008                    spdy_priority,
1009                    stream_id));
1010   }
1011
1012   return syn_frame.Pass();
1013 }
1014
1015 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
1016                                                      IOBuffer* data,
1017                                                      int len,
1018                                                      SpdyDataFlags flags) {
1019   if (availability_state_ == STATE_DRAINING) {
1020     return scoped_ptr<SpdyBuffer>();
1021   }
1022
1023   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
1024   CHECK(it != active_streams_.end());
1025   SpdyStream* stream = it->second.stream;
1026   CHECK_EQ(stream->stream_id(), stream_id);
1027
1028   if (len < 0) {
1029     NOTREACHED();
1030     return scoped_ptr<SpdyBuffer>();
1031   }
1032
1033   int effective_len = std::min(len, kMaxSpdyFrameChunkSize);
1034
1035   bool send_stalled_by_stream =
1036       (flow_control_state_ >= FLOW_CONTROL_STREAM) &&
1037       (stream->send_window_size() <= 0);
1038   bool send_stalled_by_session = IsSendStalled();
1039
1040   // NOTE: There's an enum of the same name in histograms.xml.
1041   enum SpdyFrameFlowControlState {
1042     SEND_NOT_STALLED,
1043     SEND_STALLED_BY_STREAM,
1044     SEND_STALLED_BY_SESSION,
1045     SEND_STALLED_BY_STREAM_AND_SESSION,
1046   };
1047
1048   SpdyFrameFlowControlState frame_flow_control_state = SEND_NOT_STALLED;
1049   if (send_stalled_by_stream) {
1050     if (send_stalled_by_session) {
1051       frame_flow_control_state = SEND_STALLED_BY_STREAM_AND_SESSION;
1052     } else {
1053       frame_flow_control_state = SEND_STALLED_BY_STREAM;
1054     }
1055   } else if (send_stalled_by_session) {
1056     frame_flow_control_state = SEND_STALLED_BY_SESSION;
1057   }
1058
1059   if (flow_control_state_ == FLOW_CONTROL_STREAM) {
1060     UMA_HISTOGRAM_ENUMERATION(
1061         "Net.SpdyFrameStreamFlowControlState",
1062         frame_flow_control_state,
1063         SEND_STALLED_BY_STREAM + 1);
1064   } else if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1065     UMA_HISTOGRAM_ENUMERATION(
1066         "Net.SpdyFrameStreamAndSessionFlowControlState",
1067         frame_flow_control_state,
1068         SEND_STALLED_BY_STREAM_AND_SESSION + 1);
1069   }
1070
1071   // Obey send window size of the stream if stream flow control is
1072   // enabled.
1073   if (flow_control_state_ >= FLOW_CONTROL_STREAM) {
1074     if (send_stalled_by_stream) {
1075       stream->set_send_stalled_by_flow_control(true);
1076       // Even though we're currently stalled only by the stream, we
1077       // might end up being stalled by the session also.
1078       QueueSendStalledStream(*stream);
1079       net_log().AddEvent(
1080           NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_STREAM_SEND_WINDOW,
1081           NetLog::IntegerCallback("stream_id", stream_id));
1082       return scoped_ptr<SpdyBuffer>();
1083     }
1084
1085     effective_len = std::min(effective_len, stream->send_window_size());
1086   }
1087
1088   // Obey send window size of the session if session flow control is
1089   // enabled.
1090   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1091     if (send_stalled_by_session) {
1092       stream->set_send_stalled_by_flow_control(true);
1093       QueueSendStalledStream(*stream);
1094       net_log().AddEvent(
1095           NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_SESSION_SEND_WINDOW,
1096           NetLog::IntegerCallback("stream_id", stream_id));
1097       return scoped_ptr<SpdyBuffer>();
1098     }
1099
1100     effective_len = std::min(effective_len, session_send_window_size_);
1101   }
1102
1103   DCHECK_GE(effective_len, 0);
1104
1105   // Clear FIN flag if only some of the data will be in the data
1106   // frame.
1107   if (effective_len < len)
1108     flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
1109
1110   if (net_log().IsLogging()) {
1111     net_log().AddEvent(
1112         NetLog::TYPE_SPDY_SESSION_SEND_DATA,
1113         base::Bind(&NetLogSpdyDataCallback, stream_id, effective_len,
1114                    (flags & DATA_FLAG_FIN) != 0));
1115   }
1116
1117   // Send PrefacePing for DATA_FRAMEs with nonzero payload size.
1118   if (effective_len > 0)
1119     SendPrefacePingIfNoneInFlight();
1120
1121   // TODO(mbelshe): reduce memory copies here.
1122   DCHECK(buffered_spdy_framer_.get());
1123   scoped_ptr<SpdyFrame> frame(
1124       buffered_spdy_framer_->CreateDataFrame(
1125           stream_id, data->data(),
1126           static_cast<uint32>(effective_len), flags));
1127
1128   scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass()));
1129
1130   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1131     DecreaseSendWindowSize(static_cast<int32>(effective_len));
1132     data_buffer->AddConsumeCallback(
1133         base::Bind(&SpdySession::OnWriteBufferConsumed,
1134                    weak_factory_.GetWeakPtr(),
1135                    static_cast<size_t>(effective_len)));
1136   }
1137
1138   return data_buffer.Pass();
1139 }
1140
1141 void SpdySession::CloseActiveStream(SpdyStreamId stream_id, int status) {
1142   DCHECK_NE(stream_id, 0u);
1143
1144   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1145   if (it == active_streams_.end()) {
1146     NOTREACHED();
1147     return;
1148   }
1149
1150   CloseActiveStreamIterator(it, status);
1151 }
1152
1153 void SpdySession::CloseCreatedStream(
1154     const base::WeakPtr<SpdyStream>& stream, int status) {
1155   DCHECK_EQ(stream->stream_id(), 0u);
1156
1157   CreatedStreamSet::iterator it = created_streams_.find(stream.get());
1158   if (it == created_streams_.end()) {
1159     NOTREACHED();
1160     return;
1161   }
1162
1163   CloseCreatedStreamIterator(it, status);
1164 }
1165
1166 void SpdySession::ResetStream(SpdyStreamId stream_id,
1167                               SpdyRstStreamStatus status,
1168                               const std::string& description) {
1169   DCHECK_NE(stream_id, 0u);
1170
1171   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1172   if (it == active_streams_.end()) {
1173     NOTREACHED();
1174     return;
1175   }
1176
1177   ResetStreamIterator(it, status, description);
1178 }
1179
1180 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const {
1181   return ContainsKey(active_streams_, stream_id);
1182 }
1183
1184 LoadState SpdySession::GetLoadState() const {
1185   // Just report that we're idle since the session could be doing
1186   // many things concurrently.
1187   return LOAD_STATE_IDLE;
1188 }
1189
1190 void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it,
1191                                             int status) {
1192   // TODO(mbelshe): We should send a RST_STREAM control frame here
1193   //                so that the server can cancel a large send.
1194
1195   scoped_ptr<SpdyStream> owned_stream(it->second.stream);
1196   active_streams_.erase(it);
1197
1198   // TODO(akalin): When SpdyStream was ref-counted (and
1199   // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this
1200   // was only done when status was not OK. This meant that pushed
1201   // streams can still be claimed after they're closed. This is
1202   // probably something that we still want to support, although server
1203   // push is hardly used. Write tests for this and fix this. (See
1204   // http://crbug.com/261712 .)
1205   if (owned_stream->type() == SPDY_PUSH_STREAM)
1206     unclaimed_pushed_streams_.erase(owned_stream->url());
1207
1208   DeleteStream(owned_stream.Pass(), status);
1209   MaybeFinishGoingAway();
1210
1211   // If there are no active streams and the socket pool is stalled, close the
1212   // session to free up a socket slot.
1213   if (active_streams_.empty() && connection_->IsPoolStalled()) {
1214     DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
1215   }
1216 }
1217
1218 void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it,
1219                                              int status) {
1220   scoped_ptr<SpdyStream> owned_stream(*it);
1221   created_streams_.erase(it);
1222   DeleteStream(owned_stream.Pass(), status);
1223 }
1224
1225 void SpdySession::ResetStreamIterator(ActiveStreamMap::iterator it,
1226                                       SpdyRstStreamStatus status,
1227                                       const std::string& description) {
1228   // Send the RST_STREAM frame first as CloseActiveStreamIterator()
1229   // may close us.
1230   SpdyStreamId stream_id = it->first;
1231   RequestPriority priority = it->second.stream->priority();
1232   EnqueueResetStreamFrame(stream_id, priority, status, description);
1233
1234   // Removes any pending writes for the stream except for possibly an
1235   // in-flight one.
1236   CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
1237 }
1238
1239 void SpdySession::EnqueueResetStreamFrame(SpdyStreamId stream_id,
1240                                           RequestPriority priority,
1241                                           SpdyRstStreamStatus status,
1242                                           const std::string& description) {
1243   DCHECK_NE(stream_id, 0u);
1244
1245   net_log().AddEvent(
1246       NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
1247       base::Bind(&NetLogSpdyRstCallback, stream_id, status, &description));
1248
1249   DCHECK(buffered_spdy_framer_.get());
1250   scoped_ptr<SpdyFrame> rst_frame(
1251       buffered_spdy_framer_->CreateRstStream(stream_id, status));
1252
1253   EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass());
1254   RecordProtocolErrorHistogram(MapRstStreamStatusToProtocolError(status));
1255 }
1256
1257 void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) {
1258   CHECK(!in_io_loop_);
1259   if (availability_state_ == STATE_DRAINING) {
1260     return;
1261   }
1262   ignore_result(DoReadLoop(expected_read_state, result));
1263 }
1264
1265 int SpdySession::DoReadLoop(ReadState expected_read_state, int result) {
1266   CHECK(!in_io_loop_);
1267   CHECK_EQ(read_state_, expected_read_state);
1268
1269   in_io_loop_ = true;
1270
1271   int bytes_read_without_yielding = 0;
1272
1273   // Loop until the session is draining, the read becomes blocked, or
1274   // the read limit is exceeded.
1275   while (true) {
1276     switch (read_state_) {
1277       case READ_STATE_DO_READ:
1278         CHECK_EQ(result, OK);
1279         result = DoRead();
1280         break;
1281       case READ_STATE_DO_READ_COMPLETE:
1282         if (result > 0)
1283           bytes_read_without_yielding += result;
1284         result = DoReadComplete(result);
1285         break;
1286       default:
1287         NOTREACHED() << "read_state_: " << read_state_;
1288         break;
1289     }
1290
1291     if (availability_state_ == STATE_DRAINING)
1292       break;
1293
1294     if (result == ERR_IO_PENDING)
1295       break;
1296
1297     if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) {
1298       read_state_ = READ_STATE_DO_READ;
1299       base::MessageLoop::current()->PostTask(
1300           FROM_HERE,
1301           base::Bind(&SpdySession::PumpReadLoop,
1302                      weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
1303       result = ERR_IO_PENDING;
1304       break;
1305     }
1306   }
1307
1308   CHECK(in_io_loop_);
1309   in_io_loop_ = false;
1310
1311   return result;
1312 }
1313
1314 int SpdySession::DoRead() {
1315   CHECK(in_io_loop_);
1316
1317   CHECK(connection_);
1318   CHECK(connection_->socket());
1319   read_state_ = READ_STATE_DO_READ_COMPLETE;
1320   return connection_->socket()->Read(
1321       read_buffer_.get(),
1322       kReadBufferSize,
1323       base::Bind(&SpdySession::PumpReadLoop,
1324                  weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE));
1325 }
1326
1327 int SpdySession::DoReadComplete(int result) {
1328   CHECK(in_io_loop_);
1329
1330   // Parse a frame.  For now this code requires that the frame fit into our
1331   // buffer (kReadBufferSize).
1332   // TODO(mbelshe): support arbitrarily large frames!
1333
1334   if (result == 0) {
1335     UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF",
1336                                 total_bytes_received_, 1, 100000000, 50);
1337     DoDrainSession(ERR_CONNECTION_CLOSED, "Connection closed");
1338
1339     return ERR_CONNECTION_CLOSED;
1340   }
1341
1342   if (result < 0) {
1343     DoDrainSession(static_cast<Error>(result), "result is < 0.");
1344     return result;
1345   }
1346   CHECK_LE(result, kReadBufferSize);
1347   total_bytes_received_ += result;
1348
1349   last_activity_time_ = time_func_();
1350
1351   DCHECK(buffered_spdy_framer_.get());
1352   char* data = read_buffer_->data();
1353   while (result > 0) {
1354     uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result);
1355     result -= bytes_processed;
1356     data += bytes_processed;
1357
1358     if (availability_state_ == STATE_DRAINING) {
1359       return ERR_CONNECTION_CLOSED;
1360     }
1361
1362     DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR);
1363   }
1364
1365   read_state_ = READ_STATE_DO_READ;
1366   return OK;
1367 }
1368
1369 void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) {
1370   CHECK(!in_io_loop_);
1371   DCHECK_EQ(write_state_, expected_write_state);
1372
1373   DoWriteLoop(expected_write_state, result);
1374
1375   if (availability_state_ == STATE_DRAINING && !in_flight_write_ &&
1376       write_queue_.IsEmpty()) {
1377     pool_->RemoveUnavailableSession(GetWeakPtr());  // Destroys |this|.
1378     return;
1379   }
1380 }
1381
1382 int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) {
1383   CHECK(!in_io_loop_);
1384   DCHECK_NE(write_state_, WRITE_STATE_IDLE);
1385   DCHECK_EQ(write_state_, expected_write_state);
1386
1387   in_io_loop_ = true;
1388
1389   // Loop until the session is closed or the write becomes blocked.
1390   while (true) {
1391     switch (write_state_) {
1392       case WRITE_STATE_DO_WRITE:
1393         DCHECK_EQ(result, OK);
1394         result = DoWrite();
1395         break;
1396       case WRITE_STATE_DO_WRITE_COMPLETE:
1397         result = DoWriteComplete(result);
1398         break;
1399       case WRITE_STATE_IDLE:
1400       default:
1401         NOTREACHED() << "write_state_: " << write_state_;
1402         break;
1403     }
1404
1405     if (write_state_ == WRITE_STATE_IDLE) {
1406       DCHECK_EQ(result, ERR_IO_PENDING);
1407       break;
1408     }
1409
1410     if (result == ERR_IO_PENDING)
1411       break;
1412   }
1413
1414   CHECK(in_io_loop_);
1415   in_io_loop_ = false;
1416
1417   return result;
1418 }
1419
1420 int SpdySession::DoWrite() {
1421   CHECK(in_io_loop_);
1422
1423   DCHECK(buffered_spdy_framer_);
1424   if (in_flight_write_) {
1425     DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1426   } else {
1427     // Grab the next frame to send.
1428     SpdyFrameType frame_type = DATA;
1429     scoped_ptr<SpdyBufferProducer> producer;
1430     base::WeakPtr<SpdyStream> stream;
1431     if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) {
1432       write_state_ = WRITE_STATE_IDLE;
1433       return ERR_IO_PENDING;
1434     }
1435
1436     if (stream.get())
1437       CHECK(!stream->IsClosed());
1438
1439     // Activate the stream only when sending the SYN_STREAM frame to
1440     // guarantee monotonically-increasing stream IDs.
1441     if (frame_type == SYN_STREAM) {
1442       CHECK(stream.get());
1443       CHECK_EQ(stream->stream_id(), 0u);
1444       scoped_ptr<SpdyStream> owned_stream =
1445           ActivateCreatedStream(stream.get());
1446       InsertActivatedStream(owned_stream.Pass());
1447
1448       if (stream_hi_water_mark_ > kLastStreamId) {
1449         CHECK_EQ(stream->stream_id(), kLastStreamId);
1450         // We've exhausted the stream ID space, and no new streams may be
1451         // created after this one.
1452         MakeUnavailable();
1453         StartGoingAway(kLastStreamId, ERR_ABORTED);
1454       }
1455     }
1456
1457     in_flight_write_ = producer->ProduceBuffer();
1458     if (!in_flight_write_) {
1459       NOTREACHED();
1460       return ERR_UNEXPECTED;
1461     }
1462     in_flight_write_frame_type_ = frame_type;
1463     in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
1464     DCHECK_GE(in_flight_write_frame_size_,
1465               buffered_spdy_framer_->GetFrameMinimumSize());
1466     in_flight_write_stream_ = stream;
1467   }
1468
1469   write_state_ = WRITE_STATE_DO_WRITE_COMPLETE;
1470
1471   // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems
1472   // with Socket implementations that don't store their IOBuffer
1473   // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345).
1474   scoped_refptr<IOBuffer> write_io_buffer =
1475       in_flight_write_->GetIOBufferForRemainingData();
1476   return connection_->socket()->Write(
1477       write_io_buffer.get(),
1478       in_flight_write_->GetRemainingSize(),
1479       base::Bind(&SpdySession::PumpWriteLoop,
1480                  weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE));
1481 }
1482
1483 int SpdySession::DoWriteComplete(int result) {
1484   CHECK(in_io_loop_);
1485   DCHECK_NE(result, ERR_IO_PENDING);
1486   DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1487
1488   last_activity_time_ = time_func_();
1489
1490   if (result < 0) {
1491     DCHECK_NE(result, ERR_IO_PENDING);
1492     in_flight_write_.reset();
1493     in_flight_write_frame_type_ = DATA;
1494     in_flight_write_frame_size_ = 0;
1495     in_flight_write_stream_.reset();
1496     write_state_ = WRITE_STATE_DO_WRITE;
1497     DoDrainSession(static_cast<Error>(result), "Write error");
1498     return OK;
1499   }
1500
1501   // It should not be possible to have written more bytes than our
1502   // in_flight_write_.
1503   DCHECK_LE(static_cast<size_t>(result),
1504             in_flight_write_->GetRemainingSize());
1505
1506   if (result > 0) {
1507     in_flight_write_->Consume(static_cast<size_t>(result));
1508
1509     // We only notify the stream when we've fully written the pending frame.
1510     if (in_flight_write_->GetRemainingSize() == 0) {
1511       // It is possible that the stream was cancelled while we were
1512       // writing to the socket.
1513       if (in_flight_write_stream_.get()) {
1514         DCHECK_GT(in_flight_write_frame_size_, 0u);
1515         in_flight_write_stream_->OnFrameWriteComplete(
1516             in_flight_write_frame_type_,
1517             in_flight_write_frame_size_);
1518       }
1519
1520       // Cleanup the write which just completed.
1521       in_flight_write_.reset();
1522       in_flight_write_frame_type_ = DATA;
1523       in_flight_write_frame_size_ = 0;
1524       in_flight_write_stream_.reset();
1525     }
1526   }
1527
1528   write_state_ = WRITE_STATE_DO_WRITE;
1529   return OK;
1530 }
1531
1532 void SpdySession::DcheckGoingAway() const {
1533 #if DCHECK_IS_ON
1534   DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1535   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
1536     DCHECK(pending_create_stream_queues_[i].empty());
1537   }
1538   DCHECK(created_streams_.empty());
1539 #endif
1540 }
1541
1542 void SpdySession::DcheckDraining() const {
1543   DcheckGoingAway();
1544   DCHECK_EQ(availability_state_, STATE_DRAINING);
1545   DCHECK(active_streams_.empty());
1546   DCHECK(unclaimed_pushed_streams_.empty());
1547 }
1548
1549 void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id,
1550                                  Error status) {
1551   DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1552
1553   // The loops below are carefully written to avoid reentrancy problems.
1554
1555   while (true) {
1556     size_t old_size = GetTotalSize(pending_create_stream_queues_);
1557     base::WeakPtr<SpdyStreamRequest> pending_request =
1558         GetNextPendingStreamRequest();
1559     if (!pending_request)
1560       break;
1561     // No new stream requests should be added while the session is
1562     // going away.
1563     DCHECK_GT(old_size, GetTotalSize(pending_create_stream_queues_));
1564     pending_request->OnRequestCompleteFailure(ERR_ABORTED);
1565   }
1566
1567   while (true) {
1568     size_t old_size = active_streams_.size();
1569     ActiveStreamMap::iterator it =
1570         active_streams_.lower_bound(last_good_stream_id + 1);
1571     if (it == active_streams_.end())
1572       break;
1573     LogAbandonedActiveStream(it, status);
1574     CloseActiveStreamIterator(it, status);
1575     // No new streams should be activated while the session is going
1576     // away.
1577     DCHECK_GT(old_size, active_streams_.size());
1578   }
1579
1580   while (!created_streams_.empty()) {
1581     size_t old_size = created_streams_.size();
1582     CreatedStreamSet::iterator it = created_streams_.begin();
1583     LogAbandonedStream(*it, status);
1584     CloseCreatedStreamIterator(it, status);
1585     // No new streams should be created while the session is going
1586     // away.
1587     DCHECK_GT(old_size, created_streams_.size());
1588   }
1589
1590   write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id);
1591
1592   DcheckGoingAway();
1593 }
1594
1595 void SpdySession::MaybeFinishGoingAway() {
1596   if (active_streams_.empty() && availability_state_ == STATE_GOING_AWAY) {
1597     DoDrainSession(OK, "Finished going away");
1598   }
1599 }
1600
1601 void SpdySession::DoDrainSession(Error err, const std::string& description) {
1602   if (availability_state_ == STATE_DRAINING) {
1603     return;
1604   }
1605   MakeUnavailable();
1606
1607   // If |err| indicates an error occurred, inform the peer that we're closing
1608   // and why. Don't GOAWAY on a graceful or idle close, as that may
1609   // unnecessarily wake the radio. We could technically GOAWAY on network errors
1610   // (we'll probably fail to actually write it, but that's okay), however many
1611   // unit-tests would need to be updated.
1612   if (err != OK &&
1613       err != ERR_ABORTED &&  // Used by SpdySessionPool to close idle sessions.
1614       err != ERR_NETWORK_CHANGED &&  // Used to deprecate sessions on IP change.
1615       err != ERR_SOCKET_NOT_CONNECTED &&
1616       err != ERR_CONNECTION_CLOSED && err != ERR_CONNECTION_RESET) {
1617     // Enqueue a GOAWAY to inform the peer of why we're closing the connection.
1618     SpdyGoAwayIR goaway_ir(0,  // Last accepted stream ID.
1619                            MapNetErrorToGoAwayStatus(err),
1620                            description);
1621     EnqueueSessionWrite(HIGHEST,
1622                         GOAWAY,
1623                         scoped_ptr<SpdyFrame>(
1624                             buffered_spdy_framer_->SerializeFrame(goaway_ir)));
1625   }
1626
1627   availability_state_ = STATE_DRAINING;
1628   error_on_close_ = err;
1629
1630   net_log_.AddEvent(
1631       NetLog::TYPE_SPDY_SESSION_CLOSE,
1632       base::Bind(&NetLogSpdySessionCloseCallback, err, &description));
1633
1634   UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err);
1635   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors",
1636                               total_bytes_received_, 1, 100000000, 50);
1637
1638   if (err == OK) {
1639     // We ought to be going away already, as this is a graceful close.
1640     DcheckGoingAway();
1641   } else {
1642     StartGoingAway(0, err);
1643   }
1644   DcheckDraining();
1645   MaybePostWriteLoop();
1646 }
1647
1648 void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) {
1649   DCHECK(stream);
1650   std::string description = base::StringPrintf(
1651       "ABANDONED (stream_id=%d): ", stream->stream_id()) +
1652       stream->url().spec();
1653   stream->LogStreamError(status, description);
1654   // We don't increment the streams abandoned counter here. If the
1655   // stream isn't active (i.e., it hasn't written anything to the wire
1656   // yet) then it's as if it never existed. If it is active, then
1657   // LogAbandonedActiveStream() will increment the counters.
1658 }
1659
1660 void SpdySession::LogAbandonedActiveStream(ActiveStreamMap::const_iterator it,
1661                                            Error status) {
1662   DCHECK_GT(it->first, 0u);
1663   LogAbandonedStream(it->second.stream, status);
1664   ++streams_abandoned_count_;
1665   base::StatsCounter abandoned_streams("spdy.abandoned_streams");
1666   abandoned_streams.Increment();
1667   if (it->second.stream->type() == SPDY_PUSH_STREAM &&
1668       unclaimed_pushed_streams_.find(it->second.stream->url()) !=
1669       unclaimed_pushed_streams_.end()) {
1670     base::StatsCounter abandoned_push_streams("spdy.abandoned_push_streams");
1671     abandoned_push_streams.Increment();
1672   }
1673 }
1674
1675 SpdyStreamId SpdySession::GetNewStreamId() {
1676   CHECK_LE(stream_hi_water_mark_, kLastStreamId);
1677   SpdyStreamId id = stream_hi_water_mark_;
1678   stream_hi_water_mark_ += 2;
1679   return id;
1680 }
1681
1682 void SpdySession::CloseSessionOnError(Error err,
1683                                       const std::string& description) {
1684   DCHECK_LT(err, ERR_IO_PENDING);
1685   DoDrainSession(err, description);
1686 }
1687
1688 void SpdySession::MakeUnavailable() {
1689   if (availability_state_ == STATE_AVAILABLE) {
1690     availability_state_ = STATE_GOING_AWAY;
1691     pool_->MakeSessionUnavailable(GetWeakPtr());
1692   }
1693 }
1694
1695 base::Value* SpdySession::GetInfoAsValue() const {
1696   base::DictionaryValue* dict = new base::DictionaryValue();
1697
1698   dict->SetInteger("source_id", net_log_.source().id);
1699
1700   dict->SetString("host_port_pair", host_port_pair().ToString());
1701   if (!pooled_aliases_.empty()) {
1702     base::ListValue* alias_list = new base::ListValue();
1703     for (std::set<SpdySessionKey>::const_iterator it =
1704              pooled_aliases_.begin();
1705          it != pooled_aliases_.end(); it++) {
1706       alias_list->Append(new base::StringValue(
1707           it->host_port_pair().ToString()));
1708     }
1709     dict->Set("aliases", alias_list);
1710   }
1711   dict->SetString("proxy", host_port_proxy_pair().second.ToURI());
1712
1713   dict->SetInteger("active_streams", active_streams_.size());
1714
1715   dict->SetInteger("unclaimed_pushed_streams",
1716                    unclaimed_pushed_streams_.size());
1717
1718   dict->SetBoolean("is_secure", is_secure_);
1719
1720   dict->SetString("protocol_negotiated",
1721                   SSLClientSocket::NextProtoToString(
1722                       connection_->socket()->GetNegotiatedProtocol()));
1723
1724   dict->SetInteger("error", error_on_close_);
1725   dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
1726
1727   dict->SetInteger("streams_initiated_count", streams_initiated_count_);
1728   dict->SetInteger("streams_pushed_count", streams_pushed_count_);
1729   dict->SetInteger("streams_pushed_and_claimed_count",
1730       streams_pushed_and_claimed_count_);
1731   dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
1732   DCHECK(buffered_spdy_framer_.get());
1733   dict->SetInteger("frames_received", buffered_spdy_framer_->frames_received());
1734
1735   dict->SetBoolean("sent_settings", sent_settings_);
1736   dict->SetBoolean("received_settings", received_settings_);
1737
1738   dict->SetInteger("send_window_size", session_send_window_size_);
1739   dict->SetInteger("recv_window_size", session_recv_window_size_);
1740   dict->SetInteger("unacked_recv_window_bytes",
1741                    session_unacked_recv_window_bytes_);
1742   return dict;
1743 }
1744
1745 bool SpdySession::IsReused() const {
1746   return buffered_spdy_framer_->frames_received() > 0 ||
1747       connection_->reuse_type() == ClientSocketHandle::UNUSED_IDLE;
1748 }
1749
1750 bool SpdySession::GetLoadTimingInfo(SpdyStreamId stream_id,
1751                                     LoadTimingInfo* load_timing_info) const {
1752   return connection_->GetLoadTimingInfo(stream_id != kFirstStreamId,
1753                                         load_timing_info);
1754 }
1755
1756 int SpdySession::GetPeerAddress(IPEndPoint* address) const {
1757   int rv = ERR_SOCKET_NOT_CONNECTED;
1758   if (connection_->socket()) {
1759     rv = connection_->socket()->GetPeerAddress(address);
1760   }
1761
1762   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetPeerAddress",
1763                         rv == ERR_SOCKET_NOT_CONNECTED);
1764
1765   return rv;
1766 }
1767
1768 int SpdySession::GetLocalAddress(IPEndPoint* address) const {
1769   int rv = ERR_SOCKET_NOT_CONNECTED;
1770   if (connection_->socket()) {
1771     rv = connection_->socket()->GetLocalAddress(address);
1772   }
1773
1774   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetLocalAddress",
1775                         rv == ERR_SOCKET_NOT_CONNECTED);
1776
1777   return rv;
1778 }
1779
1780 void SpdySession::EnqueueSessionWrite(RequestPriority priority,
1781                                       SpdyFrameType frame_type,
1782                                       scoped_ptr<SpdyFrame> frame) {
1783   DCHECK(frame_type == RST_STREAM || frame_type == SETTINGS ||
1784          frame_type == WINDOW_UPDATE || frame_type == PING ||
1785          frame_type == GOAWAY);
1786   EnqueueWrite(
1787       priority, frame_type,
1788       scoped_ptr<SpdyBufferProducer>(
1789           new SimpleBufferProducer(
1790               scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
1791       base::WeakPtr<SpdyStream>());
1792 }
1793
1794 void SpdySession::EnqueueWrite(RequestPriority priority,
1795                                SpdyFrameType frame_type,
1796                                scoped_ptr<SpdyBufferProducer> producer,
1797                                const base::WeakPtr<SpdyStream>& stream) {
1798   if (availability_state_ == STATE_DRAINING)
1799     return;
1800
1801   write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
1802   MaybePostWriteLoop();
1803 }
1804
1805 void SpdySession::MaybePostWriteLoop() {
1806   if (write_state_ == WRITE_STATE_IDLE) {
1807     CHECK(!in_flight_write_);
1808     write_state_ = WRITE_STATE_DO_WRITE;
1809     base::MessageLoop::current()->PostTask(
1810         FROM_HERE,
1811         base::Bind(&SpdySession::PumpWriteLoop,
1812                    weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK));
1813   }
1814 }
1815
1816 void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) {
1817   CHECK_EQ(stream->stream_id(), 0u);
1818   CHECK(created_streams_.find(stream.get()) == created_streams_.end());
1819   created_streams_.insert(stream.release());
1820 }
1821
1822 scoped_ptr<SpdyStream> SpdySession::ActivateCreatedStream(SpdyStream* stream) {
1823   CHECK_EQ(stream->stream_id(), 0u);
1824   CHECK(created_streams_.find(stream) != created_streams_.end());
1825   stream->set_stream_id(GetNewStreamId());
1826   scoped_ptr<SpdyStream> owned_stream(stream);
1827   created_streams_.erase(stream);
1828   return owned_stream.Pass();
1829 }
1830
1831 void SpdySession::InsertActivatedStream(scoped_ptr<SpdyStream> stream) {
1832   SpdyStreamId stream_id = stream->stream_id();
1833   CHECK_NE(stream_id, 0u);
1834   std::pair<ActiveStreamMap::iterator, bool> result =
1835       active_streams_.insert(
1836           std::make_pair(stream_id, ActiveStreamInfo(stream.get())));
1837   CHECK(result.second);
1838   ignore_result(stream.release());
1839 }
1840
1841 void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) {
1842   if (in_flight_write_stream_.get() == stream.get()) {
1843     // If we're deleting the stream for the in-flight write, we still
1844     // need to let the write complete, so we clear
1845     // |in_flight_write_stream_| and let the write finish on its own
1846     // without notifying |in_flight_write_stream_|.
1847     in_flight_write_stream_.reset();
1848   }
1849
1850   write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr());
1851   stream->OnClose(status);
1852
1853   if (availability_state_ == STATE_AVAILABLE) {
1854     ProcessPendingStreamRequests();
1855   }
1856 }
1857
1858 base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) {
1859   base::StatsCounter used_push_streams("spdy.claimed_push_streams");
1860
1861   PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url);
1862   if (unclaimed_it == unclaimed_pushed_streams_.end())
1863     return base::WeakPtr<SpdyStream>();
1864
1865   SpdyStreamId stream_id = unclaimed_it->second.stream_id;
1866   unclaimed_pushed_streams_.erase(unclaimed_it);
1867
1868   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
1869   if (active_it == active_streams_.end()) {
1870     NOTREACHED();
1871     return base::WeakPtr<SpdyStream>();
1872   }
1873
1874   net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM);
1875   used_push_streams.Increment();
1876   return active_it->second.stream->GetWeakPtr();
1877 }
1878
1879 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info,
1880                              bool* was_npn_negotiated,
1881                              NextProto* protocol_negotiated) {
1882   *was_npn_negotiated = connection_->socket()->WasNpnNegotiated();
1883   *protocol_negotiated = connection_->socket()->GetNegotiatedProtocol();
1884   return connection_->socket()->GetSSLInfo(ssl_info);
1885 }
1886
1887 bool SpdySession::GetSSLCertRequestInfo(
1888     SSLCertRequestInfo* cert_request_info) {
1889   if (!is_secure_)
1890     return false;
1891   GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info);
1892   return true;
1893 }
1894
1895 void SpdySession::OnError(SpdyFramer::SpdyError error_code) {
1896   CHECK(in_io_loop_);
1897
1898   RecordProtocolErrorHistogram(MapFramerErrorToProtocolError(error_code));
1899   std::string description =
1900       base::StringPrintf("Framer error: %d (%s).",
1901                          error_code,
1902                          SpdyFramer::ErrorCodeToString(error_code));
1903   DoDrainSession(MapFramerErrorToNetError(error_code), description);
1904 }
1905
1906 void SpdySession::OnStreamError(SpdyStreamId stream_id,
1907                                 const std::string& description) {
1908   CHECK(in_io_loop_);
1909
1910   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1911   if (it == active_streams_.end()) {
1912     // We still want to send a frame to reset the stream even if we
1913     // don't know anything about it.
1914     EnqueueResetStreamFrame(
1915         stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description);
1916     return;
1917   }
1918
1919   ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description);
1920 }
1921
1922 void SpdySession::OnDataFrameHeader(SpdyStreamId stream_id,
1923                                     size_t length,
1924                                     bool fin) {
1925   CHECK(in_io_loop_);
1926
1927   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1928
1929   // By the time data comes in, the stream may already be inactive.
1930   if (it == active_streams_.end())
1931     return;
1932
1933   SpdyStream* stream = it->second.stream;
1934   CHECK_EQ(stream->stream_id(), stream_id);
1935
1936   DCHECK(buffered_spdy_framer_);
1937   size_t header_len = buffered_spdy_framer_->GetDataFrameMinimumSize();
1938   stream->IncrementRawReceivedBytes(header_len);
1939 }
1940
1941 void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
1942                                     const char* data,
1943                                     size_t len,
1944                                     bool fin) {
1945   CHECK(in_io_loop_);
1946
1947   if (data == NULL && len != 0) {
1948     // This is notification of consumed data padding.
1949     // TODO(jgraettinger): Properly flow padding into WINDOW_UPDATE frames.
1950     // See crbug.com/353012.
1951     return;
1952   }
1953
1954   DCHECK_LT(len, 1u << 24);
1955   if (net_log().IsLogging()) {
1956     net_log().AddEvent(
1957         NetLog::TYPE_SPDY_SESSION_RECV_DATA,
1958         base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin));
1959   }
1960
1961   // Build the buffer as early as possible so that we go through the
1962   // session flow control checks and update
1963   // |unacked_recv_window_bytes_| properly even when the stream is
1964   // inactive (since the other side has still reduced its session send
1965   // window).
1966   scoped_ptr<SpdyBuffer> buffer;
1967   if (data) {
1968     DCHECK_GT(len, 0u);
1969     CHECK_LE(len, static_cast<size_t>(kReadBufferSize));
1970     buffer.reset(new SpdyBuffer(data, len));
1971
1972     if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1973       DecreaseRecvWindowSize(static_cast<int32>(len));
1974       buffer->AddConsumeCallback(
1975           base::Bind(&SpdySession::OnReadBufferConsumed,
1976                      weak_factory_.GetWeakPtr()));
1977     }
1978   } else {
1979     DCHECK_EQ(len, 0u);
1980   }
1981
1982   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1983
1984   // By the time data comes in, the stream may already be inactive.
1985   if (it == active_streams_.end())
1986     return;
1987
1988   SpdyStream* stream = it->second.stream;
1989   CHECK_EQ(stream->stream_id(), stream_id);
1990
1991   stream->IncrementRawReceivedBytes(len);
1992
1993   if (it->second.waiting_for_syn_reply) {
1994     const std::string& error = "Data received before SYN_REPLY.";
1995     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
1996     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
1997     return;
1998   }
1999
2000   stream->OnDataReceived(buffer.Pass());
2001 }
2002
2003 void SpdySession::OnSettings(bool clear_persisted) {
2004   CHECK(in_io_loop_);
2005
2006   if (clear_persisted)
2007     http_server_properties_->ClearSpdySettings(host_port_pair());
2008
2009   if (net_log_.IsLogging()) {
2010     net_log_.AddEvent(
2011         NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
2012         base::Bind(&NetLogSpdySettingsCallback, host_port_pair(),
2013                    clear_persisted));
2014   }
2015
2016   if (GetProtocolVersion() >= SPDY4) {
2017     // Send an acknowledgment of the setting.
2018     SpdySettingsIR settings_ir;
2019     settings_ir.set_is_ack(true);
2020     EnqueueSessionWrite(
2021         HIGHEST,
2022         SETTINGS,
2023         scoped_ptr<SpdyFrame>(
2024             buffered_spdy_framer_->SerializeFrame(settings_ir)));
2025   }
2026 }
2027
2028 void SpdySession::OnSetting(SpdySettingsIds id,
2029                             uint8 flags,
2030                             uint32 value) {
2031   CHECK(in_io_loop_);
2032
2033   HandleSetting(id, value);
2034   http_server_properties_->SetSpdySetting(
2035       host_port_pair(),
2036       id,
2037       static_cast<SpdySettingsFlags>(flags),
2038       value);
2039   received_settings_ = true;
2040
2041   // Log the setting.
2042   net_log_.AddEvent(
2043       NetLog::TYPE_SPDY_SESSION_RECV_SETTING,
2044       base::Bind(&NetLogSpdySettingCallback,
2045                  id, static_cast<SpdySettingsFlags>(flags), value));
2046 }
2047
2048 void SpdySession::OnSendCompressedFrame(
2049     SpdyStreamId stream_id,
2050     SpdyFrameType type,
2051     size_t payload_len,
2052     size_t frame_len) {
2053   if (type != SYN_STREAM)
2054     return;
2055
2056   DCHECK(buffered_spdy_framer_.get());
2057   size_t compressed_len =
2058       frame_len - buffered_spdy_framer_->GetSynStreamMinimumSize();
2059
2060   if (payload_len) {
2061     // Make sure we avoid early decimal truncation.
2062     int compression_pct = 100 - (100 * compressed_len) / payload_len;
2063     UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage",
2064                              compression_pct);
2065   }
2066 }
2067
2068 void SpdySession::OnReceiveCompressedFrame(
2069     SpdyStreamId stream_id,
2070     SpdyFrameType type,
2071     size_t frame_len) {
2072   last_compressed_frame_len_ = frame_len;
2073 }
2074
2075 int SpdySession::OnInitialResponseHeadersReceived(
2076     const SpdyHeaderBlock& response_headers,
2077     base::Time response_time,
2078     base::TimeTicks recv_first_byte_time,
2079     SpdyStream* stream) {
2080   CHECK(in_io_loop_);
2081   SpdyStreamId stream_id = stream->stream_id();
2082   // May invalidate |stream|.
2083   int rv = stream->OnInitialResponseHeadersReceived(
2084       response_headers, response_time, recv_first_byte_time);
2085   if (rv < 0) {
2086     DCHECK_NE(rv, ERR_IO_PENDING);
2087     DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2088   }
2089   return rv;
2090 }
2091
2092 void SpdySession::OnSynStream(SpdyStreamId stream_id,
2093                               SpdyStreamId associated_stream_id,
2094                               SpdyPriority priority,
2095                               bool fin,
2096                               bool unidirectional,
2097                               const SpdyHeaderBlock& headers) {
2098   CHECK(in_io_loop_);
2099
2100   if (GetProtocolVersion() >= SPDY4) {
2101     DCHECK_EQ(0u, associated_stream_id);
2102     OnHeaders(stream_id, fin, headers);
2103     return;
2104   }
2105
2106   base::Time response_time = base::Time::Now();
2107   base::TimeTicks recv_first_byte_time = time_func_();
2108
2109   if (net_log_.IsLogging()) {
2110     net_log_.AddEvent(
2111         NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
2112         base::Bind(&NetLogSpdySynStreamReceivedCallback,
2113                    &headers, fin, unidirectional, priority,
2114                    stream_id, associated_stream_id));
2115   }
2116
2117   // Split headers to simulate push promise and response.
2118   SpdyHeaderBlock request_headers;
2119   SpdyHeaderBlock response_headers;
2120   SplitPushedHeadersToRequestAndResponse(
2121       headers, GetProtocolVersion(), &request_headers, &response_headers);
2122
2123   if (!TryCreatePushStream(
2124           stream_id, associated_stream_id, priority, request_headers))
2125     return;
2126
2127   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
2128   if (active_it == active_streams_.end()) {
2129     NOTREACHED();
2130     return;
2131   }
2132
2133   if (OnInitialResponseHeadersReceived(response_headers,
2134                                        response_time,
2135                                        recv_first_byte_time,
2136                                        active_it->second.stream) != OK)
2137     return;
2138
2139   base::StatsCounter push_requests("spdy.pushed_streams");
2140   push_requests.Increment();
2141 }
2142
2143 void SpdySession::DeleteExpiredPushedStreams() {
2144   if (unclaimed_pushed_streams_.empty())
2145     return;
2146
2147   // Check that adequate time has elapsed since the last sweep.
2148   if (time_func_() < next_unclaimed_push_stream_sweep_time_)
2149     return;
2150
2151   // Gather old streams to delete.
2152   base::TimeTicks minimum_freshness = time_func_() -
2153       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2154   std::vector<SpdyStreamId> streams_to_close;
2155   for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin();
2156        it != unclaimed_pushed_streams_.end(); ++it) {
2157     if (minimum_freshness > it->second.creation_time)
2158       streams_to_close.push_back(it->second.stream_id);
2159   }
2160
2161   for (std::vector<SpdyStreamId>::const_iterator to_close_it =
2162            streams_to_close.begin();
2163        to_close_it != streams_to_close.end(); ++to_close_it) {
2164     ActiveStreamMap::iterator active_it = active_streams_.find(*to_close_it);
2165     if (active_it == active_streams_.end())
2166       continue;
2167
2168     LogAbandonedActiveStream(active_it, ERR_INVALID_SPDY_STREAM);
2169     // CloseActiveStreamIterator() will remove the stream from
2170     // |unclaimed_pushed_streams_|.
2171     ResetStreamIterator(
2172         active_it, RST_STREAM_REFUSED_STREAM, "Stream not claimed.");
2173   }
2174
2175   next_unclaimed_push_stream_sweep_time_ = time_func_() +
2176       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2177 }
2178
2179 void SpdySession::OnSynReply(SpdyStreamId stream_id,
2180                              bool fin,
2181                              const SpdyHeaderBlock& headers) {
2182   CHECK(in_io_loop_);
2183
2184   base::Time response_time = base::Time::Now();
2185   base::TimeTicks recv_first_byte_time = time_func_();
2186
2187   if (net_log().IsLogging()) {
2188     net_log().AddEvent(
2189         NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
2190         base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2191                    &headers, fin, stream_id));
2192   }
2193
2194   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2195   if (it == active_streams_.end()) {
2196     // NOTE:  it may just be that the stream was cancelled.
2197     return;
2198   }
2199
2200   SpdyStream* stream = it->second.stream;
2201   CHECK_EQ(stream->stream_id(), stream_id);
2202
2203   stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2204   last_compressed_frame_len_ = 0;
2205
2206   if (GetProtocolVersion() >= SPDY4) {
2207     const std::string& error =
2208         "SPDY4 wasn't expecting SYN_REPLY.";
2209     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2210     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2211     return;
2212   }
2213   if (!it->second.waiting_for_syn_reply) {
2214     const std::string& error =
2215         "Received duplicate SYN_REPLY for stream.";
2216     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2217     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2218     return;
2219   }
2220   it->second.waiting_for_syn_reply = false;
2221
2222   ignore_result(OnInitialResponseHeadersReceived(
2223       headers, response_time, recv_first_byte_time, stream));
2224 }
2225
2226 void SpdySession::OnHeaders(SpdyStreamId stream_id,
2227                             bool fin,
2228                             const SpdyHeaderBlock& headers) {
2229   CHECK(in_io_loop_);
2230
2231   if (net_log().IsLogging()) {
2232     net_log().AddEvent(
2233         NetLog::TYPE_SPDY_SESSION_RECV_HEADERS,
2234         base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2235                    &headers, fin, stream_id));
2236   }
2237
2238   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2239   if (it == active_streams_.end()) {
2240     // NOTE:  it may just be that the stream was cancelled.
2241     LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
2242     return;
2243   }
2244
2245   SpdyStream* stream = it->second.stream;
2246   CHECK_EQ(stream->stream_id(), stream_id);
2247
2248   stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2249   last_compressed_frame_len_ = 0;
2250
2251   base::Time response_time = base::Time::Now();
2252   base::TimeTicks recv_first_byte_time = time_func_();
2253
2254   if (it->second.waiting_for_syn_reply) {
2255     if (GetProtocolVersion() < SPDY4) {
2256       const std::string& error =
2257           "Was expecting SYN_REPLY, not HEADERS.";
2258       stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2259       ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2260       return;
2261     }
2262
2263     it->second.waiting_for_syn_reply = false;
2264     ignore_result(OnInitialResponseHeadersReceived(
2265         headers, response_time, recv_first_byte_time, stream));
2266   } else if (it->second.stream->IsReservedRemote()) {
2267     ignore_result(OnInitialResponseHeadersReceived(
2268         headers, response_time, recv_first_byte_time, stream));
2269   } else {
2270     int rv = stream->OnAdditionalResponseHeadersReceived(headers);
2271     if (rv < 0) {
2272       DCHECK_NE(rv, ERR_IO_PENDING);
2273       DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2274     }
2275   }
2276 }
2277
2278 void SpdySession::OnRstStream(SpdyStreamId stream_id,
2279                               SpdyRstStreamStatus status) {
2280   CHECK(in_io_loop_);
2281
2282   std::string description;
2283   net_log().AddEvent(
2284       NetLog::TYPE_SPDY_SESSION_RST_STREAM,
2285       base::Bind(&NetLogSpdyRstCallback,
2286                  stream_id, status, &description));
2287
2288   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2289   if (it == active_streams_.end()) {
2290     // NOTE:  it may just be that the stream was cancelled.
2291     LOG(WARNING) << "Received RST for invalid stream" << stream_id;
2292     return;
2293   }
2294
2295   CHECK_EQ(it->second.stream->stream_id(), stream_id);
2296
2297   if (status == 0) {
2298     it->second.stream->OnDataReceived(scoped_ptr<SpdyBuffer>());
2299   } else if (status == RST_STREAM_REFUSED_STREAM) {
2300     CloseActiveStreamIterator(it, ERR_SPDY_SERVER_REFUSED_STREAM);
2301   } else {
2302     RecordProtocolErrorHistogram(
2303         PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM);
2304     it->second.stream->LogStreamError(
2305         ERR_SPDY_PROTOCOL_ERROR,
2306         base::StringPrintf("SPDY stream closed with status: %d", status));
2307     // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
2308     //                For now, it doesn't matter much - it is a protocol error.
2309     CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
2310   }
2311 }
2312
2313 void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id,
2314                            SpdyGoAwayStatus status) {
2315   CHECK(in_io_loop_);
2316
2317   // TODO(jgraettinger): UMA histogram on |status|.
2318
2319   net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY,
2320       base::Bind(&NetLogSpdyGoAwayCallback,
2321                  last_accepted_stream_id,
2322                  active_streams_.size(),
2323                  unclaimed_pushed_streams_.size(),
2324                  status));
2325   MakeUnavailable();
2326   StartGoingAway(last_accepted_stream_id, ERR_ABORTED);
2327   // This is to handle the case when we already don't have any active
2328   // streams (i.e., StartGoingAway() did nothing). Otherwise, we have
2329   // active streams and so the last one being closed will finish the
2330   // going away process (see DeleteStream()).
2331   MaybeFinishGoingAway();
2332 }
2333
2334 void SpdySession::OnPing(SpdyPingId unique_id, bool is_ack) {
2335   CHECK(in_io_loop_);
2336
2337   net_log_.AddEvent(
2338       NetLog::TYPE_SPDY_SESSION_PING,
2339       base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "received"));
2340
2341   // Send response to a PING from server.
2342   if ((protocol_ >= kProtoSPDY4 && !is_ack) ||
2343       (protocol_ < kProtoSPDY4 && unique_id % 2 == 0)) {
2344     WritePingFrame(unique_id, true);
2345     return;
2346   }
2347
2348   --pings_in_flight_;
2349   if (pings_in_flight_ < 0) {
2350     RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING);
2351     DoDrainSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0.");
2352     pings_in_flight_ = 0;
2353     return;
2354   }
2355
2356   if (pings_in_flight_ > 0)
2357     return;
2358
2359   // We will record RTT in histogram when there are no more client sent
2360   // pings_in_flight_.
2361   RecordPingRTTHistogram(time_func_() - last_ping_sent_time_);
2362 }
2363
2364 void SpdySession::OnWindowUpdate(SpdyStreamId stream_id,
2365                                  uint32 delta_window_size) {
2366   CHECK(in_io_loop_);
2367
2368   DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max));
2369   net_log_.AddEvent(
2370       NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME,
2371       base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2372                  stream_id, delta_window_size));
2373
2374   if (stream_id == kSessionFlowControlStreamId) {
2375     // WINDOW_UPDATE for the session.
2376     if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) {
2377       LOG(WARNING) << "Received WINDOW_UPDATE for session when "
2378                    << "session flow control is not turned on";
2379       // TODO(akalin): Record an error and close the session.
2380       return;
2381     }
2382
2383     if (delta_window_size < 1u) {
2384       RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
2385       DoDrainSession(
2386           ERR_SPDY_PROTOCOL_ERROR,
2387           "Received WINDOW_UPDATE with an invalid delta_window_size " +
2388               base::UintToString(delta_window_size));
2389       return;
2390     }
2391
2392     IncreaseSendWindowSize(static_cast<int32>(delta_window_size));
2393   } else {
2394     // WINDOW_UPDATE for a stream.
2395     if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2396       // TODO(akalin): Record an error and close the session.
2397       LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id
2398                    << " when flow control is not turned on";
2399       return;
2400     }
2401
2402     ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2403
2404     if (it == active_streams_.end()) {
2405       // NOTE:  it may just be that the stream was cancelled.
2406       LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
2407       return;
2408     }
2409
2410     SpdyStream* stream = it->second.stream;
2411     CHECK_EQ(stream->stream_id(), stream_id);
2412
2413     if (delta_window_size < 1u) {
2414       ResetStreamIterator(it,
2415                           RST_STREAM_FLOW_CONTROL_ERROR,
2416                           base::StringPrintf(
2417                               "Received WINDOW_UPDATE with an invalid "
2418                               "delta_window_size %ud", delta_window_size));
2419       return;
2420     }
2421
2422     CHECK_EQ(it->second.stream->stream_id(), stream_id);
2423     it->second.stream->IncreaseSendWindowSize(
2424         static_cast<int32>(delta_window_size));
2425   }
2426 }
2427
2428 bool SpdySession::TryCreatePushStream(SpdyStreamId stream_id,
2429                                       SpdyStreamId associated_stream_id,
2430                                       SpdyPriority priority,
2431                                       const SpdyHeaderBlock& headers) {
2432   // Server-initiated streams should have even sequence numbers.
2433   if ((stream_id & 0x1) != 0) {
2434     LOG(WARNING) << "Received invalid push stream id " << stream_id;
2435     return false;
2436   }
2437
2438   if (IsStreamActive(stream_id)) {
2439     LOG(WARNING) << "Received push for active stream " << stream_id;
2440     return false;
2441   }
2442
2443   RequestPriority request_priority =
2444       ConvertSpdyPriorityToRequestPriority(priority, GetProtocolVersion());
2445
2446   if (availability_state_ == STATE_GOING_AWAY) {
2447     // TODO(akalin): This behavior isn't in the SPDY spec, although it
2448     // probably should be.
2449     EnqueueResetStreamFrame(stream_id,
2450                             request_priority,
2451                             RST_STREAM_REFUSED_STREAM,
2452                             "push stream request received when going away");
2453     return false;
2454   }
2455
2456   if (associated_stream_id == 0) {
2457     // In SPDY4 0 stream id in PUSH_PROMISE frame leads to framer error and
2458     // session going away. We should never get here.
2459     CHECK_GT(SPDY4, GetProtocolVersion());
2460     std::string description = base::StringPrintf(
2461         "Received invalid associated stream id %d for pushed stream %d",
2462         associated_stream_id,
2463         stream_id);
2464     EnqueueResetStreamFrame(
2465         stream_id, request_priority, RST_STREAM_REFUSED_STREAM, description);
2466     return false;
2467   }
2468
2469   streams_pushed_count_++;
2470
2471   // TODO(mbelshe): DCHECK that this is a GET method?
2472
2473   // Verify that the response had a URL for us.
2474   GURL gurl = GetUrlFromHeaderBlock(headers, GetProtocolVersion(), true);
2475   if (!gurl.is_valid()) {
2476     EnqueueResetStreamFrame(stream_id,
2477                             request_priority,
2478                             RST_STREAM_PROTOCOL_ERROR,
2479                             "Pushed stream url was invalid: " + gurl.spec());
2480     return false;
2481   }
2482
2483   // Verify we have a valid stream association.
2484   ActiveStreamMap::iterator associated_it =
2485       active_streams_.find(associated_stream_id);
2486   if (associated_it == active_streams_.end()) {
2487     EnqueueResetStreamFrame(
2488         stream_id,
2489         request_priority,
2490         RST_STREAM_INVALID_STREAM,
2491         base::StringPrintf("Received push for inactive associated stream %d",
2492                            associated_stream_id));
2493     return false;
2494   }
2495
2496   // Check that the pushed stream advertises the same origin as its associated
2497   // stream. Bypass this check if and only if this session is with a SPDY proxy
2498   // that is trusted explicitly via the --trusted-spdy-proxy switch.
2499   if (trusted_spdy_proxy_.Equals(host_port_pair())) {
2500     // Disallow pushing of HTTPS content.
2501     if (gurl.SchemeIs("https")) {
2502       EnqueueResetStreamFrame(
2503           stream_id,
2504           request_priority,
2505           RST_STREAM_REFUSED_STREAM,
2506           base::StringPrintf("Rejected push of Cross Origin HTTPS content %d",
2507                              associated_stream_id));
2508     }
2509   } else {
2510     GURL associated_url(associated_it->second.stream->GetUrlFromHeaders());
2511     if (associated_url.GetOrigin() != gurl.GetOrigin()) {
2512       EnqueueResetStreamFrame(
2513           stream_id,
2514           request_priority,
2515           RST_STREAM_REFUSED_STREAM,
2516           base::StringPrintf("Rejected Cross Origin Push Stream %d",
2517                              associated_stream_id));
2518       return false;
2519     }
2520   }
2521
2522   // There should not be an existing pushed stream with the same path.
2523   PushedStreamMap::iterator pushed_it =
2524       unclaimed_pushed_streams_.lower_bound(gurl);
2525   if (pushed_it != unclaimed_pushed_streams_.end() &&
2526       pushed_it->first == gurl) {
2527     EnqueueResetStreamFrame(
2528         stream_id,
2529         request_priority,
2530         RST_STREAM_PROTOCOL_ERROR,
2531         "Received duplicate pushed stream with url: " + gurl.spec());
2532     return false;
2533   }
2534
2535   scoped_ptr<SpdyStream> stream(new SpdyStream(SPDY_PUSH_STREAM,
2536                                                GetWeakPtr(),
2537                                                gurl,
2538                                                request_priority,
2539                                                stream_initial_send_window_size_,
2540                                                stream_initial_recv_window_size_,
2541                                                net_log_));
2542   stream->set_stream_id(stream_id);
2543
2544   // In spdy4/http2 PUSH_PROMISE arrives on associated stream.
2545   if (associated_it != active_streams_.end() && GetProtocolVersion() >= SPDY4) {
2546     associated_it->second.stream->IncrementRawReceivedBytes(
2547         last_compressed_frame_len_);
2548   } else {
2549     stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2550   }
2551
2552   last_compressed_frame_len_ = 0;
2553
2554   DeleteExpiredPushedStreams();
2555   PushedStreamMap::iterator inserted_pushed_it =
2556       unclaimed_pushed_streams_.insert(
2557           pushed_it,
2558           std::make_pair(gurl, PushedStreamInfo(stream_id, time_func_())));
2559   DCHECK(inserted_pushed_it != pushed_it);
2560
2561   InsertActivatedStream(stream.Pass());
2562
2563   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
2564   if (active_it == active_streams_.end()) {
2565     NOTREACHED();
2566     return false;
2567   }
2568
2569   active_it->second.stream->OnPushPromiseHeadersReceived(headers);
2570   DCHECK(active_it->second.stream->IsReservedRemote());
2571   return true;
2572 }
2573
2574 void SpdySession::OnPushPromise(SpdyStreamId stream_id,
2575                                 SpdyStreamId promised_stream_id,
2576                                 const SpdyHeaderBlock& headers) {
2577   CHECK(in_io_loop_);
2578
2579   if (net_log_.IsLogging()) {
2580     net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_RECV_PUSH_PROMISE,
2581                       base::Bind(&NetLogSpdyPushPromiseReceivedCallback,
2582                                  &headers,
2583                                  stream_id,
2584                                  promised_stream_id));
2585   }
2586
2587   // Any priority will do.
2588   // TODO(baranovich): pass parent stream id priority?
2589   if (!TryCreatePushStream(promised_stream_id, stream_id, 0, headers))
2590     return;
2591
2592   base::StatsCounter push_requests("spdy.pushed_streams");
2593   push_requests.Increment();
2594 }
2595
2596 void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id,
2597                                          uint32 delta_window_size) {
2598   CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2599   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2600   CHECK(it != active_streams_.end());
2601   CHECK_EQ(it->second.stream->stream_id(), stream_id);
2602   SendWindowUpdateFrame(
2603       stream_id, delta_window_size, it->second.stream->priority());
2604 }
2605
2606 void SpdySession::SendInitialData() {
2607   DCHECK(enable_sending_initial_data_);
2608
2609   if (send_connection_header_prefix_) {
2610     DCHECK_EQ(protocol_, kProtoSPDY4);
2611     scoped_ptr<SpdyFrame> connection_header_prefix_frame(
2612         new SpdyFrame(const_cast<char*>(kHttp2ConnectionHeaderPrefix),
2613                       kHttp2ConnectionHeaderPrefixSize,
2614                       false /* take_ownership */));
2615     // Count the prefix as part of the subsequent SETTINGS frame.
2616     EnqueueSessionWrite(HIGHEST, SETTINGS,
2617                         connection_header_prefix_frame.Pass());
2618   }
2619
2620   // First, notify the server about the settings they should use when
2621   // communicating with us.
2622   SettingsMap settings_map;
2623   // Create a new settings frame notifying the server of our
2624   // max concurrent streams and initial window size.
2625   settings_map[SETTINGS_MAX_CONCURRENT_STREAMS] =
2626       SettingsFlagsAndValue(SETTINGS_FLAG_NONE, kMaxConcurrentPushedStreams);
2627   if (flow_control_state_ >= FLOW_CONTROL_STREAM &&
2628       stream_initial_recv_window_size_ != kSpdyStreamInitialWindowSize) {
2629     settings_map[SETTINGS_INITIAL_WINDOW_SIZE] =
2630         SettingsFlagsAndValue(SETTINGS_FLAG_NONE,
2631                               stream_initial_recv_window_size_);
2632   }
2633   SendSettings(settings_map);
2634
2635   // Next, notify the server about our initial recv window size.
2636   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
2637     // Bump up the receive window size to the real initial value. This
2638     // has to go here since the WINDOW_UPDATE frame sent by
2639     // IncreaseRecvWindowSize() call uses |buffered_spdy_framer_|.
2640     DCHECK_GT(kDefaultInitialRecvWindowSize, session_recv_window_size_);
2641     // This condition implies that |kDefaultInitialRecvWindowSize| -
2642     // |session_recv_window_size_| doesn't overflow.
2643     DCHECK_GT(session_recv_window_size_, 0);
2644     IncreaseRecvWindowSize(
2645         kDefaultInitialRecvWindowSize - session_recv_window_size_);
2646   }
2647
2648   // Finally, notify the server about the settings they have
2649   // previously told us to use when communicating with them (after
2650   // applying them).
2651   const SettingsMap& server_settings_map =
2652       http_server_properties_->GetSpdySettings(host_port_pair());
2653   if (server_settings_map.empty())
2654     return;
2655
2656   SettingsMap::const_iterator it =
2657       server_settings_map.find(SETTINGS_CURRENT_CWND);
2658   uint32 cwnd = (it != server_settings_map.end()) ? it->second.second : 0;
2659   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", cwnd, 1, 200, 100);
2660
2661   for (SettingsMap::const_iterator it = server_settings_map.begin();
2662        it != server_settings_map.end(); ++it) {
2663     const SpdySettingsIds new_id = it->first;
2664     const uint32 new_val = it->second.second;
2665     HandleSetting(new_id, new_val);
2666   }
2667
2668   SendSettings(server_settings_map);
2669 }
2670
2671
2672 void SpdySession::SendSettings(const SettingsMap& settings) {
2673   net_log_.AddEvent(
2674       NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
2675       base::Bind(&NetLogSpdySendSettingsCallback, &settings));
2676
2677   // Create the SETTINGS frame and send it.
2678   DCHECK(buffered_spdy_framer_.get());
2679   scoped_ptr<SpdyFrame> settings_frame(
2680       buffered_spdy_framer_->CreateSettings(settings));
2681   sent_settings_ = true;
2682   EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass());
2683 }
2684
2685 void SpdySession::HandleSetting(uint32 id, uint32 value) {
2686   switch (id) {
2687     case SETTINGS_MAX_CONCURRENT_STREAMS:
2688       max_concurrent_streams_ = std::min(static_cast<size_t>(value),
2689                                          kMaxConcurrentStreamLimit);
2690       ProcessPendingStreamRequests();
2691       break;
2692     case SETTINGS_INITIAL_WINDOW_SIZE: {
2693       if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2694         net_log().AddEvent(
2695             NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_NO_FLOW_CONTROL);
2696         return;
2697       }
2698
2699       if (value > static_cast<uint32>(kint32max)) {
2700         net_log().AddEvent(
2701             NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_OUT_OF_RANGE,
2702             NetLog::IntegerCallback("initial_window_size", value));
2703         return;
2704       }
2705
2706       // SETTINGS_INITIAL_WINDOW_SIZE updates initial_send_window_size_ only.
2707       int32 delta_window_size =
2708           static_cast<int32>(value) - stream_initial_send_window_size_;
2709       stream_initial_send_window_size_ = static_cast<int32>(value);
2710       UpdateStreamsSendWindowSize(delta_window_size);
2711       net_log().AddEvent(
2712           NetLog::TYPE_SPDY_SESSION_UPDATE_STREAMS_SEND_WINDOW_SIZE,
2713           NetLog::IntegerCallback("delta_window_size", delta_window_size));
2714       break;
2715     }
2716   }
2717 }
2718
2719 void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
2720   DCHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2721   for (ActiveStreamMap::iterator it = active_streams_.begin();
2722        it != active_streams_.end(); ++it) {
2723     it->second.stream->AdjustSendWindowSize(delta_window_size);
2724   }
2725
2726   for (CreatedStreamSet::const_iterator it = created_streams_.begin();
2727        it != created_streams_.end(); it++) {
2728     (*it)->AdjustSendWindowSize(delta_window_size);
2729   }
2730 }
2731
2732 void SpdySession::SendPrefacePingIfNoneInFlight() {
2733   if (pings_in_flight_ || !enable_ping_based_connection_checking_)
2734     return;
2735
2736   base::TimeTicks now = time_func_();
2737   // If there is no activity in the session, then send a preface-PING.
2738   if ((now - last_activity_time_) > connection_at_risk_of_loss_time_)
2739     SendPrefacePing();
2740 }
2741
2742 void SpdySession::SendPrefacePing() {
2743   WritePingFrame(next_ping_id_, false);
2744 }
2745
2746 void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id,
2747                                         uint32 delta_window_size,
2748                                         RequestPriority priority) {
2749   CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2750   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2751   if (it != active_streams_.end()) {
2752     CHECK_EQ(it->second.stream->stream_id(), stream_id);
2753   } else {
2754     CHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2755     CHECK_EQ(stream_id, kSessionFlowControlStreamId);
2756   }
2757
2758   net_log_.AddEvent(
2759       NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME,
2760       base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2761                  stream_id, delta_window_size));
2762
2763   DCHECK(buffered_spdy_framer_.get());
2764   scoped_ptr<SpdyFrame> window_update_frame(
2765       buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
2766   EnqueueSessionWrite(priority, WINDOW_UPDATE, window_update_frame.Pass());
2767 }
2768
2769 void SpdySession::WritePingFrame(uint32 unique_id, bool is_ack) {
2770   DCHECK(buffered_spdy_framer_.get());
2771   scoped_ptr<SpdyFrame> ping_frame(
2772       buffered_spdy_framer_->CreatePingFrame(unique_id, is_ack));
2773   EnqueueSessionWrite(HIGHEST, PING, ping_frame.Pass());
2774
2775   if (net_log().IsLogging()) {
2776     net_log().AddEvent(
2777         NetLog::TYPE_SPDY_SESSION_PING,
2778         base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "sent"));
2779   }
2780   if (!is_ack) {
2781     next_ping_id_ += 2;
2782     ++pings_in_flight_;
2783     PlanToCheckPingStatus();
2784     last_ping_sent_time_ = time_func_();
2785   }
2786 }
2787
2788 void SpdySession::PlanToCheckPingStatus() {
2789   if (check_ping_status_pending_)
2790     return;
2791
2792   check_ping_status_pending_ = true;
2793   base::MessageLoop::current()->PostDelayedTask(
2794       FROM_HERE,
2795       base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2796                  time_func_()), hung_interval_);
2797 }
2798
2799 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) {
2800   CHECK(!in_io_loop_);
2801
2802   // Check if we got a response back for all PINGs we had sent.
2803   if (pings_in_flight_ == 0) {
2804     check_ping_status_pending_ = false;
2805     return;
2806   }
2807
2808   DCHECK(check_ping_status_pending_);
2809
2810   base::TimeTicks now = time_func_();
2811   base::TimeDelta delay = hung_interval_ - (now - last_activity_time_);
2812
2813   if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) {
2814     // Track all failed PING messages in a separate bucket.
2815     RecordPingRTTHistogram(base::TimeDelta::Max());
2816     DoDrainSession(ERR_SPDY_PING_FAILED, "Failed ping.");
2817     return;
2818   }
2819
2820   // Check the status of connection after a delay.
2821   base::MessageLoop::current()->PostDelayedTask(
2822       FROM_HERE,
2823       base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2824                  now),
2825       delay);
2826 }
2827
2828 void SpdySession::RecordPingRTTHistogram(base::TimeDelta duration) {
2829   UMA_HISTOGRAM_TIMES("Net.SpdyPing.RTT", duration);
2830 }
2831
2832 void SpdySession::RecordProtocolErrorHistogram(
2833     SpdyProtocolErrorDetails details) {
2834   UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails2", details,
2835                             NUM_SPDY_PROTOCOL_ERROR_DETAILS);
2836   if (EndsWith(host_port_pair().host(), "google.com", false)) {
2837     UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails_Google2", details,
2838                               NUM_SPDY_PROTOCOL_ERROR_DETAILS);
2839   }
2840 }
2841
2842 void SpdySession::RecordHistograms() {
2843   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
2844                               streams_initiated_count_,
2845                               0, 300, 50);
2846   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
2847                               streams_pushed_count_,
2848                               0, 300, 50);
2849   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
2850                               streams_pushed_and_claimed_count_,
2851                               0, 300, 50);
2852   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
2853                               streams_abandoned_count_,
2854                               0, 300, 50);
2855   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
2856                             sent_settings_ ? 1 : 0, 2);
2857   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
2858                             received_settings_ ? 1 : 0, 2);
2859   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
2860                               stalled_streams_,
2861                               0, 300, 50);
2862   UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
2863                             stalled_streams_ > 0 ? 1 : 0, 2);
2864
2865   if (received_settings_) {
2866     // Enumerate the saved settings, and set histograms for it.
2867     const SettingsMap& settings_map =
2868         http_server_properties_->GetSpdySettings(host_port_pair());
2869
2870     SettingsMap::const_iterator it;
2871     for (it = settings_map.begin(); it != settings_map.end(); ++it) {
2872       const SpdySettingsIds id = it->first;
2873       const uint32 val = it->second.second;
2874       switch (id) {
2875         case SETTINGS_CURRENT_CWND:
2876           // Record several different histograms to see if cwnd converges
2877           // for larger volumes of data being sent.
2878           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
2879                                       val, 1, 200, 100);
2880           if (total_bytes_received_ > 10 * 1024) {
2881             UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
2882                                         val, 1, 200, 100);
2883             if (total_bytes_received_ > 25 * 1024) {
2884               UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
2885                                           val, 1, 200, 100);
2886               if (total_bytes_received_ > 50 * 1024) {
2887                 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
2888                                             val, 1, 200, 100);
2889                 if (total_bytes_received_ > 100 * 1024) {
2890                   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
2891                                               val, 1, 200, 100);
2892                 }
2893               }
2894             }
2895           }
2896           break;
2897         case SETTINGS_ROUND_TRIP_TIME:
2898           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
2899                                       val, 1, 1200, 100);
2900           break;
2901         case SETTINGS_DOWNLOAD_RETRANS_RATE:
2902           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
2903                                       val, 1, 100, 50);
2904           break;
2905         default:
2906           break;
2907       }
2908     }
2909   }
2910 }
2911
2912 void SpdySession::CompleteStreamRequest(
2913     const base::WeakPtr<SpdyStreamRequest>& pending_request) {
2914   // Abort if the request has already been cancelled.
2915   if (!pending_request)
2916     return;
2917
2918   base::WeakPtr<SpdyStream> stream;
2919   int rv = TryCreateStream(pending_request, &stream);
2920
2921   if (rv == OK) {
2922     DCHECK(stream);
2923     pending_request->OnRequestCompleteSuccess(stream);
2924     return;
2925   }
2926   DCHECK(!stream);
2927
2928   if (rv != ERR_IO_PENDING) {
2929     pending_request->OnRequestCompleteFailure(rv);
2930   }
2931 }
2932
2933 SSLClientSocket* SpdySession::GetSSLClientSocket() const {
2934   if (!is_secure_)
2935     return NULL;
2936   SSLClientSocket* ssl_socket =
2937       reinterpret_cast<SSLClientSocket*>(connection_->socket());
2938   DCHECK(ssl_socket);
2939   return ssl_socket;
2940 }
2941
2942 void SpdySession::OnWriteBufferConsumed(
2943     size_t frame_payload_size,
2944     size_t consume_size,
2945     SpdyBuffer::ConsumeSource consume_source) {
2946   // We can be called with |in_io_loop_| set if a write SpdyBuffer is
2947   // deleted (e.g., a stream is closed due to incoming data).
2948
2949   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2950
2951   if (consume_source == SpdyBuffer::DISCARD) {
2952     // If we're discarding a frame or part of it, increase the send
2953     // window by the number of discarded bytes. (Although if we're
2954     // discarding part of a frame, it's probably because of a write
2955     // error and we'll be tearing down the session soon.)
2956     size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
2957     DCHECK_GT(remaining_payload_bytes, 0u);
2958     IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
2959   }
2960   // For consumed bytes, the send window is increased when we receive
2961   // a WINDOW_UPDATE frame.
2962 }
2963
2964 void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
2965   // We can be called with |in_io_loop_| set if a SpdyBuffer is
2966   // deleted (e.g., a stream is closed due to incoming data).
2967
2968   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2969   DCHECK_GE(delta_window_size, 1);
2970
2971   // Check for overflow.
2972   int32 max_delta_window_size = kint32max - session_send_window_size_;
2973   if (delta_window_size > max_delta_window_size) {
2974     RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
2975     DoDrainSession(
2976         ERR_SPDY_PROTOCOL_ERROR,
2977         "Received WINDOW_UPDATE [delta: " +
2978             base::IntToString(delta_window_size) +
2979             "] for session overflows session_send_window_size_ [current: " +
2980             base::IntToString(session_send_window_size_) + "]");
2981     return;
2982   }
2983
2984   session_send_window_size_ += delta_window_size;
2985
2986   net_log_.AddEvent(
2987       NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
2988       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
2989                  delta_window_size, session_send_window_size_));
2990
2991   DCHECK(!IsSendStalled());
2992   ResumeSendStalledStreams();
2993 }
2994
2995 void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) {
2996   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2997
2998   // We only call this method when sending a frame. Therefore,
2999   // |delta_window_size| should be within the valid frame size range.
3000   DCHECK_GE(delta_window_size, 1);
3001   DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
3002
3003   // |send_window_size_| should have been at least |delta_window_size| for
3004   // this call to happen.
3005   DCHECK_GE(session_send_window_size_, delta_window_size);
3006
3007   session_send_window_size_ -= delta_window_size;
3008
3009   net_log_.AddEvent(
3010       NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
3011       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3012                  -delta_window_size, session_send_window_size_));
3013 }
3014
3015 void SpdySession::OnReadBufferConsumed(
3016     size_t consume_size,
3017     SpdyBuffer::ConsumeSource consume_source) {
3018   // We can be called with |in_io_loop_| set if a read SpdyBuffer is
3019   // deleted (e.g., discarded by a SpdyReadQueue).
3020
3021   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3022   DCHECK_GE(consume_size, 1u);
3023   DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
3024
3025   IncreaseRecvWindowSize(static_cast<int32>(consume_size));
3026 }
3027
3028 void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) {
3029   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3030   DCHECK_GE(session_unacked_recv_window_bytes_, 0);
3031   DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_);
3032   DCHECK_GE(delta_window_size, 1);
3033   // Check for overflow.
3034   DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_);
3035
3036   session_recv_window_size_ += delta_window_size;
3037   net_log_.AddEvent(
3038       NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
3039       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3040                  delta_window_size, session_recv_window_size_));
3041
3042   session_unacked_recv_window_bytes_ += delta_window_size;
3043   if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) {
3044     SendWindowUpdateFrame(kSessionFlowControlStreamId,
3045                           session_unacked_recv_window_bytes_,
3046                           HIGHEST);
3047     session_unacked_recv_window_bytes_ = 0;
3048   }
3049 }
3050
3051 void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) {
3052   CHECK(in_io_loop_);
3053   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3054   DCHECK_GE(delta_window_size, 1);
3055
3056   // Since we never decrease the initial receive window size,
3057   // |delta_window_size| should never cause |recv_window_size_| to go
3058   // negative. If we do, the receive window isn't being respected.
3059   if (delta_window_size > session_recv_window_size_) {
3060     RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION);
3061     DoDrainSession(
3062         ERR_SPDY_FLOW_CONTROL_ERROR,
3063         "delta_window_size is " + base::IntToString(delta_window_size) +
3064             " in DecreaseRecvWindowSize, which is larger than the receive " +
3065             "window size of " + base::IntToString(session_recv_window_size_));
3066     return;
3067   }
3068
3069   session_recv_window_size_ -= delta_window_size;
3070   net_log_.AddEvent(
3071       NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW,
3072       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3073                  -delta_window_size, session_recv_window_size_));
3074 }
3075
3076 void SpdySession::QueueSendStalledStream(const SpdyStream& stream) {
3077   DCHECK(stream.send_stalled_by_flow_control());
3078   RequestPriority priority = stream.priority();
3079   CHECK_GE(priority, MINIMUM_PRIORITY);
3080   CHECK_LE(priority, MAXIMUM_PRIORITY);
3081   stream_send_unstall_queue_[priority].push_back(stream.stream_id());
3082 }
3083
3084 void SpdySession::ResumeSendStalledStreams() {
3085   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3086
3087   // We don't have to worry about new streams being queued, since
3088   // doing so would cause IsSendStalled() to return true. But we do
3089   // have to worry about streams being closed, as well as ourselves
3090   // being closed.
3091
3092   while (!IsSendStalled()) {
3093     size_t old_size = 0;
3094 #if DCHECK_IS_ON
3095     old_size = GetTotalSize(stream_send_unstall_queue_);
3096 #endif
3097
3098     SpdyStreamId stream_id = PopStreamToPossiblyResume();
3099     if (stream_id == 0)
3100       break;
3101     ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
3102     // The stream may actually still be send-stalled after this (due
3103     // to its own send window) but that's okay -- it'll then be
3104     // resumed once its send window increases.
3105     if (it != active_streams_.end())
3106       it->second.stream->PossiblyResumeIfSendStalled();
3107
3108     // The size should decrease unless we got send-stalled again.
3109     if (!IsSendStalled())
3110       DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size);
3111   }
3112 }
3113
3114 SpdyStreamId SpdySession::PopStreamToPossiblyResume() {
3115   for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) {
3116     std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i];
3117     if (!queue->empty()) {
3118       SpdyStreamId stream_id = queue->front();
3119       queue->pop_front();
3120       return stream_id;
3121     }
3122   }
3123   return 0;
3124 }
3125
3126 }  // namespace net