Upstream version 7.36.149.0
[platform/framework/web/crosswalk.git] / src / net / spdy / spdy_session.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/spdy_session.h"
6
7 #include <algorithm>
8 #include <map>
9
10 #include "base/basictypes.h"
11 #include "base/bind.h"
12 #include "base/compiler_specific.h"
13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/metrics/field_trial.h"
16 #include "base/metrics/histogram.h"
17 #include "base/metrics/sparse_histogram.h"
18 #include "base/metrics/stats_counters.h"
19 #include "base/stl_util.h"
20 #include "base/strings/string_number_conversions.h"
21 #include "base/strings/string_util.h"
22 #include "base/strings/stringprintf.h"
23 #include "base/strings/utf_string_conversions.h"
24 #include "base/time/time.h"
25 #include "base/values.h"
26 #include "crypto/ec_private_key.h"
27 #include "crypto/ec_signature_creator.h"
28 #include "net/base/connection_type_histograms.h"
29 #include "net/base/net_log.h"
30 #include "net/base/net_util.h"
31 #include "net/cert/asn1_util.h"
32 #include "net/http/http_log_util.h"
33 #include "net/http/http_network_session.h"
34 #include "net/http/http_server_properties.h"
35 #include "net/http/http_util.h"
36 #include "net/spdy/spdy_buffer_producer.h"
37 #include "net/spdy/spdy_frame_builder.h"
38 #include "net/spdy/spdy_http_utils.h"
39 #include "net/spdy/spdy_protocol.h"
40 #include "net/spdy/spdy_session_pool.h"
41 #include "net/spdy/spdy_stream.h"
42 #include "net/ssl/server_bound_cert_service.h"
43
44 namespace net {
45
46 namespace {
47
48 const int kReadBufferSize = 8 * 1024;
49 const int kDefaultConnectionAtRiskOfLossSeconds = 10;
50 const int kHungIntervalSeconds = 10;
51
52 // Always start at 1 for the first stream id.
53 const SpdyStreamId kFirstStreamId = 1;
54
55 // Minimum seconds that unclaimed pushed streams will be kept in memory.
56 const int kMinPushedStreamLifetimeSeconds = 300;
57
58 scoped_ptr<base::ListValue> SpdyHeaderBlockToListValue(
59     const SpdyHeaderBlock& headers,
60     net::NetLog::LogLevel log_level) {
61   scoped_ptr<base::ListValue> headers_list(new base::ListValue());
62   for (SpdyHeaderBlock::const_iterator it = headers.begin();
63        it != headers.end(); ++it) {
64     headers_list->AppendString(
65         it->first + ": " +
66         ElideHeaderValueForNetLog(log_level, it->first, it->second));
67   }
68   return headers_list.Pass();
69 }
70
71 base::Value* NetLogSpdySynStreamSentCallback(const SpdyHeaderBlock* headers,
72                                              bool fin,
73                                              bool unidirectional,
74                                              SpdyPriority spdy_priority,
75                                              SpdyStreamId stream_id,
76                                              NetLog::LogLevel log_level) {
77   base::DictionaryValue* dict = new base::DictionaryValue();
78   dict->Set("headers",
79             SpdyHeaderBlockToListValue(*headers, log_level).release());
80   dict->SetBoolean("fin", fin);
81   dict->SetBoolean("unidirectional", unidirectional);
82   dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
83   dict->SetInteger("stream_id", stream_id);
84   return dict;
85 }
86
87 base::Value* NetLogSpdySynStreamReceivedCallback(
88     const SpdyHeaderBlock* headers,
89     bool fin,
90     bool unidirectional,
91     SpdyPriority spdy_priority,
92     SpdyStreamId stream_id,
93     SpdyStreamId associated_stream,
94     NetLog::LogLevel log_level) {
95   base::DictionaryValue* dict = new base::DictionaryValue();
96   dict->Set("headers",
97             SpdyHeaderBlockToListValue(*headers, log_level).release());
98   dict->SetBoolean("fin", fin);
99   dict->SetBoolean("unidirectional", unidirectional);
100   dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
101   dict->SetInteger("stream_id", stream_id);
102   dict->SetInteger("associated_stream", associated_stream);
103   return dict;
104 }
105
106 base::Value* NetLogSpdySynReplyOrHeadersReceivedCallback(
107     const SpdyHeaderBlock* headers,
108     bool fin,
109     SpdyStreamId stream_id,
110     NetLog::LogLevel log_level) {
111   base::DictionaryValue* dict = new base::DictionaryValue();
112   dict->Set("headers",
113             SpdyHeaderBlockToListValue(*headers, log_level).release());
114   dict->SetBoolean("fin", fin);
115   dict->SetInteger("stream_id", stream_id);
116   return dict;
117 }
118
119 base::Value* NetLogSpdySessionCloseCallback(int net_error,
120                                             const std::string* description,
121                                             NetLog::LogLevel /* log_level */) {
122   base::DictionaryValue* dict = new base::DictionaryValue();
123   dict->SetInteger("net_error", net_error);
124   dict->SetString("description", *description);
125   return dict;
126 }
127
128 base::Value* NetLogSpdySessionCallback(const HostPortProxyPair* host_pair,
129                                        NetLog::LogLevel /* log_level */) {
130   base::DictionaryValue* dict = new base::DictionaryValue();
131   dict->SetString("host", host_pair->first.ToString());
132   dict->SetString("proxy", host_pair->second.ToPacString());
133   return dict;
134 }
135
136 base::Value* NetLogSpdySettingsCallback(const HostPortPair& host_port_pair,
137                                         bool clear_persisted,
138                                         NetLog::LogLevel /* log_level */) {
139   base::DictionaryValue* dict = new base::DictionaryValue();
140   dict->SetString("host", host_port_pair.ToString());
141   dict->SetBoolean("clear_persisted", clear_persisted);
142   return dict;
143 }
144
145 base::Value* NetLogSpdySettingCallback(SpdySettingsIds id,
146                                        SpdySettingsFlags flags,
147                                        uint32 value,
148                                        NetLog::LogLevel /* log_level */) {
149   base::DictionaryValue* dict = new base::DictionaryValue();
150   dict->SetInteger("id", id);
151   dict->SetInteger("flags", flags);
152   dict->SetInteger("value", value);
153   return dict;
154 }
155
156 base::Value* NetLogSpdySendSettingsCallback(const SettingsMap* settings,
157                                             NetLog::LogLevel /* log_level */) {
158   base::DictionaryValue* dict = new base::DictionaryValue();
159   base::ListValue* settings_list = new base::ListValue();
160   for (SettingsMap::const_iterator it = settings->begin();
161        it != settings->end(); ++it) {
162     const SpdySettingsIds id = it->first;
163     const SpdySettingsFlags flags = it->second.first;
164     const uint32 value = it->second.second;
165     settings_list->Append(new base::StringValue(
166         base::StringPrintf("[id:%u flags:%u value:%u]", id, flags, value)));
167   }
168   dict->Set("settings", settings_list);
169   return dict;
170 }
171
172 base::Value* NetLogSpdyWindowUpdateFrameCallback(
173     SpdyStreamId stream_id,
174     uint32 delta,
175     NetLog::LogLevel /* log_level */) {
176   base::DictionaryValue* dict = new base::DictionaryValue();
177   dict->SetInteger("stream_id", static_cast<int>(stream_id));
178   dict->SetInteger("delta", delta);
179   return dict;
180 }
181
182 base::Value* NetLogSpdySessionWindowUpdateCallback(
183     int32 delta,
184     int32 window_size,
185     NetLog::LogLevel /* log_level */) {
186   base::DictionaryValue* dict = new base::DictionaryValue();
187   dict->SetInteger("delta", delta);
188   dict->SetInteger("window_size", window_size);
189   return dict;
190 }
191
192 base::Value* NetLogSpdyDataCallback(SpdyStreamId stream_id,
193                                     int size,
194                                     bool fin,
195                                     NetLog::LogLevel /* log_level */) {
196   base::DictionaryValue* dict = new base::DictionaryValue();
197   dict->SetInteger("stream_id", static_cast<int>(stream_id));
198   dict->SetInteger("size", size);
199   dict->SetBoolean("fin", fin);
200   return dict;
201 }
202
203 base::Value* NetLogSpdyRstCallback(SpdyStreamId stream_id,
204                                    int status,
205                                    const std::string* description,
206                                    NetLog::LogLevel /* log_level */) {
207   base::DictionaryValue* dict = new base::DictionaryValue();
208   dict->SetInteger("stream_id", static_cast<int>(stream_id));
209   dict->SetInteger("status", status);
210   dict->SetString("description", *description);
211   return dict;
212 }
213
214 base::Value* NetLogSpdyPingCallback(SpdyPingId unique_id,
215                                     bool is_ack,
216                                     const char* type,
217                                     NetLog::LogLevel /* log_level */) {
218   base::DictionaryValue* dict = new base::DictionaryValue();
219   dict->SetInteger("unique_id", unique_id);
220   dict->SetString("type", type);
221   dict->SetBoolean("is_ack", is_ack);
222   return dict;
223 }
224
225 base::Value* NetLogSpdyGoAwayCallback(SpdyStreamId last_stream_id,
226                                       int active_streams,
227                                       int unclaimed_streams,
228                                       SpdyGoAwayStatus status,
229                                       NetLog::LogLevel /* log_level */) {
230   base::DictionaryValue* dict = new base::DictionaryValue();
231   dict->SetInteger("last_accepted_stream_id",
232                    static_cast<int>(last_stream_id));
233   dict->SetInteger("active_streams", active_streams);
234   dict->SetInteger("unclaimed_streams", unclaimed_streams);
235   dict->SetInteger("status", static_cast<int>(status));
236   return dict;
237 }
238
239 // Helper function to return the total size of an array of objects
240 // with .size() member functions.
241 template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) {
242   size_t total_size = 0;
243   for (size_t i = 0; i < N; ++i) {
244     total_size += arr[i].size();
245   }
246   return total_size;
247 }
248
249 // Helper class for std:find_if on STL container containing
250 // SpdyStreamRequest weak pointers.
251 class RequestEquals {
252  public:
253   RequestEquals(const base::WeakPtr<SpdyStreamRequest>& request)
254       : request_(request) {}
255
256   bool operator()(const base::WeakPtr<SpdyStreamRequest>& request) const {
257     return request_.get() == request.get();
258   }
259
260  private:
261   const base::WeakPtr<SpdyStreamRequest> request_;
262 };
263
264 // The maximum number of concurrent streams we will ever create.  Even if
265 // the server permits more, we will never exceed this limit.
266 const size_t kMaxConcurrentStreamLimit = 256;
267
268 }  // namespace
269
270 SpdyProtocolErrorDetails MapFramerErrorToProtocolError(
271     SpdyFramer::SpdyError err) {
272   switch(err) {
273     case SpdyFramer::SPDY_NO_ERROR:
274       return SPDY_ERROR_NO_ERROR;
275     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME:
276       return SPDY_ERROR_INVALID_CONTROL_FRAME;
277     case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
278       return SPDY_ERROR_CONTROL_PAYLOAD_TOO_LARGE;
279     case SpdyFramer::SPDY_ZLIB_INIT_FAILURE:
280       return SPDY_ERROR_ZLIB_INIT_FAILURE;
281     case SpdyFramer::SPDY_UNSUPPORTED_VERSION:
282       return SPDY_ERROR_UNSUPPORTED_VERSION;
283     case SpdyFramer::SPDY_DECOMPRESS_FAILURE:
284       return SPDY_ERROR_DECOMPRESS_FAILURE;
285     case SpdyFramer::SPDY_COMPRESS_FAILURE:
286       return SPDY_ERROR_COMPRESS_FAILURE;
287     case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT:
288       return SPDY_ERROR_GOAWAY_FRAME_CORRUPT;
289     case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT:
290       return SPDY_ERROR_RST_STREAM_FRAME_CORRUPT;
291     case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS:
292       return SPDY_ERROR_INVALID_DATA_FRAME_FLAGS;
293     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS:
294       return SPDY_ERROR_INVALID_CONTROL_FRAME_FLAGS;
295     case SpdyFramer::SPDY_UNEXPECTED_FRAME:
296       return SPDY_ERROR_UNEXPECTED_FRAME;
297     default:
298       NOTREACHED();
299       return static_cast<SpdyProtocolErrorDetails>(-1);
300   }
301 }
302
303 SpdyProtocolErrorDetails MapRstStreamStatusToProtocolError(
304     SpdyRstStreamStatus status) {
305   switch(status) {
306     case RST_STREAM_PROTOCOL_ERROR:
307       return STATUS_CODE_PROTOCOL_ERROR;
308     case RST_STREAM_INVALID_STREAM:
309       return STATUS_CODE_INVALID_STREAM;
310     case RST_STREAM_REFUSED_STREAM:
311       return STATUS_CODE_REFUSED_STREAM;
312     case RST_STREAM_UNSUPPORTED_VERSION:
313       return STATUS_CODE_UNSUPPORTED_VERSION;
314     case RST_STREAM_CANCEL:
315       return STATUS_CODE_CANCEL;
316     case RST_STREAM_INTERNAL_ERROR:
317       return STATUS_CODE_INTERNAL_ERROR;
318     case RST_STREAM_FLOW_CONTROL_ERROR:
319       return STATUS_CODE_FLOW_CONTROL_ERROR;
320     case RST_STREAM_STREAM_IN_USE:
321       return STATUS_CODE_STREAM_IN_USE;
322     case RST_STREAM_STREAM_ALREADY_CLOSED:
323       return STATUS_CODE_STREAM_ALREADY_CLOSED;
324     case RST_STREAM_INVALID_CREDENTIALS:
325       return STATUS_CODE_INVALID_CREDENTIALS;
326     case RST_STREAM_FRAME_SIZE_ERROR:
327       return STATUS_CODE_FRAME_SIZE_ERROR;
328     case RST_STREAM_SETTINGS_TIMEOUT:
329       return STATUS_CODE_SETTINGS_TIMEOUT;
330     case RST_STREAM_CONNECT_ERROR:
331       return STATUS_CODE_CONNECT_ERROR;
332     case RST_STREAM_ENHANCE_YOUR_CALM:
333       return STATUS_CODE_ENHANCE_YOUR_CALM;
334     default:
335       NOTREACHED();
336       return static_cast<SpdyProtocolErrorDetails>(-1);
337   }
338 }
339
340 SpdyStreamRequest::SpdyStreamRequest() : weak_ptr_factory_(this) {
341   Reset();
342 }
343
344 SpdyStreamRequest::~SpdyStreamRequest() {
345   CancelRequest();
346 }
347
348 int SpdyStreamRequest::StartRequest(
349     SpdyStreamType type,
350     const base::WeakPtr<SpdySession>& session,
351     const GURL& url,
352     RequestPriority priority,
353     const BoundNetLog& net_log,
354     const CompletionCallback& callback) {
355   DCHECK(session);
356   DCHECK(!session_);
357   DCHECK(!stream_);
358   DCHECK(callback_.is_null());
359
360   type_ = type;
361   session_ = session;
362   url_ = url;
363   priority_ = priority;
364   net_log_ = net_log;
365   callback_ = callback;
366
367   base::WeakPtr<SpdyStream> stream;
368   int rv = session->TryCreateStream(weak_ptr_factory_.GetWeakPtr(), &stream);
369   if (rv == OK) {
370     Reset();
371     stream_ = stream;
372   }
373   return rv;
374 }
375
376 void SpdyStreamRequest::CancelRequest() {
377   if (session_)
378     session_->CancelStreamRequest(weak_ptr_factory_.GetWeakPtr());
379   Reset();
380   // Do this to cancel any pending CompleteStreamRequest() tasks.
381   weak_ptr_factory_.InvalidateWeakPtrs();
382 }
383
384 base::WeakPtr<SpdyStream> SpdyStreamRequest::ReleaseStream() {
385   DCHECK(!session_);
386   base::WeakPtr<SpdyStream> stream = stream_;
387   DCHECK(stream);
388   Reset();
389   return stream;
390 }
391
392 void SpdyStreamRequest::OnRequestCompleteSuccess(
393     const base::WeakPtr<SpdyStream>& stream) {
394   DCHECK(session_);
395   DCHECK(!stream_);
396   DCHECK(!callback_.is_null());
397   CompletionCallback callback = callback_;
398   Reset();
399   DCHECK(stream);
400   stream_ = stream;
401   callback.Run(OK);
402 }
403
404 void SpdyStreamRequest::OnRequestCompleteFailure(int rv) {
405   DCHECK(session_);
406   DCHECK(!stream_);
407   DCHECK(!callback_.is_null());
408   CompletionCallback callback = callback_;
409   Reset();
410   DCHECK_NE(rv, OK);
411   callback.Run(rv);
412 }
413
414 void SpdyStreamRequest::Reset() {
415   type_ = SPDY_BIDIRECTIONAL_STREAM;
416   session_.reset();
417   stream_.reset();
418   url_ = GURL();
419   priority_ = MINIMUM_PRIORITY;
420   net_log_ = BoundNetLog();
421   callback_.Reset();
422 }
423
424 SpdySession::ActiveStreamInfo::ActiveStreamInfo()
425     : stream(NULL),
426       waiting_for_syn_reply(false) {}
427
428 SpdySession::ActiveStreamInfo::ActiveStreamInfo(SpdyStream* stream)
429     : stream(stream),
430       waiting_for_syn_reply(stream->type() != SPDY_PUSH_STREAM) {}
431
432 SpdySession::ActiveStreamInfo::~ActiveStreamInfo() {}
433
434 SpdySession::PushedStreamInfo::PushedStreamInfo() : stream_id(0) {}
435
436 SpdySession::PushedStreamInfo::PushedStreamInfo(
437     SpdyStreamId stream_id,
438     base::TimeTicks creation_time)
439     : stream_id(stream_id),
440       creation_time(creation_time) {}
441
442 SpdySession::PushedStreamInfo::~PushedStreamInfo() {}
443
444 SpdySession::SpdySession(
445     const SpdySessionKey& spdy_session_key,
446     const base::WeakPtr<HttpServerProperties>& http_server_properties,
447     bool verify_domain_authentication,
448     bool enable_sending_initial_data,
449     bool enable_compression,
450     bool enable_ping_based_connection_checking,
451     NextProto default_protocol,
452     size_t stream_initial_recv_window_size,
453     size_t initial_max_concurrent_streams,
454     size_t max_concurrent_streams_limit,
455     TimeFunc time_func,
456     const HostPortPair& trusted_spdy_proxy,
457     NetLog* net_log)
458     : weak_factory_(this),
459       in_io_loop_(false),
460       spdy_session_key_(spdy_session_key),
461       pool_(NULL),
462       http_server_properties_(http_server_properties),
463       read_buffer_(new IOBuffer(kReadBufferSize)),
464       stream_hi_water_mark_(kFirstStreamId),
465       in_flight_write_frame_type_(DATA),
466       in_flight_write_frame_size_(0),
467       is_secure_(false),
468       certificate_error_code_(OK),
469       availability_state_(STATE_AVAILABLE),
470       read_state_(READ_STATE_DO_READ),
471       write_state_(WRITE_STATE_IDLE),
472       error_on_close_(OK),
473       max_concurrent_streams_(initial_max_concurrent_streams == 0 ?
474                               kInitialMaxConcurrentStreams :
475                               initial_max_concurrent_streams),
476       max_concurrent_streams_limit_(max_concurrent_streams_limit == 0 ?
477                                     kMaxConcurrentStreamLimit :
478                                     max_concurrent_streams_limit),
479       streams_initiated_count_(0),
480       streams_pushed_count_(0),
481       streams_pushed_and_claimed_count_(0),
482       streams_abandoned_count_(0),
483       total_bytes_received_(0),
484       sent_settings_(false),
485       received_settings_(false),
486       stalled_streams_(0),
487       pings_in_flight_(0),
488       next_ping_id_(1),
489       last_activity_time_(time_func()),
490       last_compressed_frame_len_(0),
491       check_ping_status_pending_(false),
492       send_connection_header_prefix_(false),
493       flow_control_state_(FLOW_CONTROL_NONE),
494       stream_initial_send_window_size_(kSpdyStreamInitialWindowSize),
495       stream_initial_recv_window_size_(stream_initial_recv_window_size == 0 ?
496                                        kDefaultInitialRecvWindowSize :
497                                        stream_initial_recv_window_size),
498       session_send_window_size_(0),
499       session_recv_window_size_(0),
500       session_unacked_recv_window_bytes_(0),
501       net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)),
502       verify_domain_authentication_(verify_domain_authentication),
503       enable_sending_initial_data_(enable_sending_initial_data),
504       enable_compression_(enable_compression),
505       enable_ping_based_connection_checking_(
506           enable_ping_based_connection_checking),
507       protocol_(default_protocol),
508       connection_at_risk_of_loss_time_(
509           base::TimeDelta::FromSeconds(kDefaultConnectionAtRiskOfLossSeconds)),
510       hung_interval_(
511           base::TimeDelta::FromSeconds(kHungIntervalSeconds)),
512       trusted_spdy_proxy_(trusted_spdy_proxy),
513       time_func_(time_func) {
514   DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
515   DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
516   DCHECK(HttpStreamFactory::spdy_enabled());
517   net_log_.BeginEvent(
518       NetLog::TYPE_SPDY_SESSION,
519       base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair()));
520   next_unclaimed_push_stream_sweep_time_ = time_func_() +
521       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
522   // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
523 }
524
525 SpdySession::~SpdySession() {
526   CHECK(!in_io_loop_);
527   DCHECK(!pool_);
528   DcheckClosed();
529
530   // TODO(akalin): Check connection->is_initialized() instead. This
531   // requires re-working CreateFakeSpdySession(), though.
532   DCHECK(connection_->socket());
533   // With SPDY we can't recycle sockets.
534   connection_->socket()->Disconnect();
535
536   RecordHistograms();
537
538   net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION);
539 }
540
541 void SpdySession::InitializeWithSocket(
542     scoped_ptr<ClientSocketHandle> connection,
543     SpdySessionPool* pool,
544     bool is_secure,
545     int certificate_error_code) {
546   CHECK(!in_io_loop_);
547   DCHECK_EQ(availability_state_, STATE_AVAILABLE);
548   DCHECK_EQ(read_state_, READ_STATE_DO_READ);
549   DCHECK_EQ(write_state_, WRITE_STATE_IDLE);
550   DCHECK(!connection_);
551
552   DCHECK(certificate_error_code == OK ||
553          certificate_error_code < ERR_IO_PENDING);
554   // TODO(akalin): Check connection->is_initialized() instead. This
555   // requires re-working CreateFakeSpdySession(), though.
556   DCHECK(connection->socket());
557
558   base::StatsCounter spdy_sessions("spdy.sessions");
559   spdy_sessions.Increment();
560
561   connection_ = connection.Pass();
562   is_secure_ = is_secure;
563   certificate_error_code_ = certificate_error_code;
564
565   NextProto protocol_negotiated =
566       connection_->socket()->GetNegotiatedProtocol();
567   if (protocol_negotiated != kProtoUnknown) {
568     protocol_ = protocol_negotiated;
569   }
570   DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
571   DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
572
573   if (protocol_ == kProtoSPDY4)
574     send_connection_header_prefix_ = true;
575
576   if (protocol_ >= kProtoSPDY31) {
577     flow_control_state_ = FLOW_CONTROL_STREAM_AND_SESSION;
578     session_send_window_size_ = kSpdySessionInitialWindowSize;
579     session_recv_window_size_ = kSpdySessionInitialWindowSize;
580   } else if (protocol_ >= kProtoSPDY3) {
581     flow_control_state_ = FLOW_CONTROL_STREAM;
582   } else {
583     flow_control_state_ = FLOW_CONTROL_NONE;
584   }
585
586   buffered_spdy_framer_.reset(
587       new BufferedSpdyFramer(NextProtoToSpdyMajorVersion(protocol_),
588                              enable_compression_));
589   buffered_spdy_framer_->set_visitor(this);
590   buffered_spdy_framer_->set_debug_visitor(this);
591   UMA_HISTOGRAM_ENUMERATION("Net.SpdyVersion", protocol_, kProtoMaximumVersion);
592 #if defined(SPDY_PROXY_AUTH_ORIGIN)
593   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessions_DataReductionProxy",
594                         host_port_pair().Equals(HostPortPair::FromURL(
595                             GURL(SPDY_PROXY_AUTH_ORIGIN))));
596 #endif
597
598   net_log_.AddEvent(
599       NetLog::TYPE_SPDY_SESSION_INITIALIZED,
600       connection_->socket()->NetLog().source().ToEventParametersCallback());
601
602   DCHECK_NE(availability_state_, STATE_CLOSED);
603   connection_->AddHigherLayeredPool(this);
604   if (enable_sending_initial_data_)
605     SendInitialData();
606   pool_ = pool;
607
608   // Bootstrap the read loop.
609   base::MessageLoop::current()->PostTask(
610       FROM_HERE,
611       base::Bind(&SpdySession::PumpReadLoop,
612                  weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
613 }
614
615 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
616   if (!verify_domain_authentication_)
617     return true;
618
619   if (availability_state_ == STATE_CLOSED)
620     return false;
621
622   SSLInfo ssl_info;
623   bool was_npn_negotiated;
624   NextProto protocol_negotiated = kProtoUnknown;
625   if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated))
626     return true;   // This is not a secure session, so all domains are okay.
627
628   bool unused = false;
629   return
630       !ssl_info.client_cert_sent &&
631       (!ssl_info.channel_id_sent ||
632        (ServerBoundCertService::GetDomainForHost(domain) ==
633         ServerBoundCertService::GetDomainForHost(host_port_pair().host()))) &&
634       ssl_info.cert->VerifyNameMatch(domain, &unused);
635 }
636
637 int SpdySession::GetPushStream(
638     const GURL& url,
639     base::WeakPtr<SpdyStream>* stream,
640     const BoundNetLog& stream_net_log) {
641   CHECK(!in_io_loop_);
642
643   stream->reset();
644
645   // TODO(akalin): Add unit test exercising this code path.
646   if (availability_state_ == STATE_CLOSED)
647     return ERR_CONNECTION_CLOSED;
648
649   Error err = TryAccessStream(url);
650   if (err != OK)
651     return err;
652
653   *stream = GetActivePushStream(url);
654   if (*stream) {
655     DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_);
656     streams_pushed_and_claimed_count_++;
657   }
658   return OK;
659 }
660
661 // {,Try}CreateStream() and TryAccessStream() can be called with
662 // |in_io_loop_| set if a stream is being created in response to
663 // another being closed due to received data.
664
665 Error SpdySession::TryAccessStream(const GURL& url) {
666   DCHECK_NE(availability_state_, STATE_CLOSED);
667
668   if (is_secure_ && certificate_error_code_ != OK &&
669       (url.SchemeIs("https") || url.SchemeIs("wss"))) {
670     RecordProtocolErrorHistogram(
671         PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION);
672     CloseSessionResult result = DoCloseSession(
673         static_cast<Error>(certificate_error_code_),
674         "Tried to get SPDY stream for secure content over an unauthenticated "
675         "session.");
676     DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED);
677     return ERR_SPDY_PROTOCOL_ERROR;
678   }
679   return OK;
680 }
681
682 int SpdySession::TryCreateStream(
683     const base::WeakPtr<SpdyStreamRequest>& request,
684     base::WeakPtr<SpdyStream>* stream) {
685   DCHECK(request);
686
687   if (availability_state_ == STATE_GOING_AWAY)
688     return ERR_FAILED;
689
690   // TODO(akalin): Add unit test exercising this code path.
691   if (availability_state_ == STATE_CLOSED)
692     return ERR_CONNECTION_CLOSED;
693
694   Error err = TryAccessStream(request->url());
695   if (err != OK)
696     return err;
697
698   if (!max_concurrent_streams_ ||
699       (active_streams_.size() + created_streams_.size() <
700        max_concurrent_streams_)) {
701     return CreateStream(*request, stream);
702   }
703
704   stalled_streams_++;
705   net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS);
706   RequestPriority priority = request->priority();
707   CHECK_GE(priority, MINIMUM_PRIORITY);
708   CHECK_LE(priority, MAXIMUM_PRIORITY);
709   pending_create_stream_queues_[priority].push_back(request);
710   return ERR_IO_PENDING;
711 }
712
713 int SpdySession::CreateStream(const SpdyStreamRequest& request,
714                               base::WeakPtr<SpdyStream>* stream) {
715   DCHECK_GE(request.priority(), MINIMUM_PRIORITY);
716   DCHECK_LE(request.priority(), MAXIMUM_PRIORITY);
717
718   if (availability_state_ == STATE_GOING_AWAY)
719     return ERR_FAILED;
720
721   // TODO(akalin): Add unit test exercising this code path.
722   if (availability_state_ == STATE_CLOSED)
723     return ERR_CONNECTION_CLOSED;
724
725   Error err = TryAccessStream(request.url());
726   if (err != OK) {
727     // This should have been caught in TryCreateStream().
728     NOTREACHED();
729     return err;
730   }
731
732   DCHECK(connection_->socket());
733   DCHECK(connection_->socket()->IsConnected());
734   if (connection_->socket()) {
735     UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected",
736                           connection_->socket()->IsConnected());
737     if (!connection_->socket()->IsConnected()) {
738       CloseSessionResult result = DoCloseSession(
739           ERR_CONNECTION_CLOSED,
740           "Tried to create SPDY stream for a closed socket connection.");
741       DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED);
742       return ERR_CONNECTION_CLOSED;
743     }
744   }
745
746   scoped_ptr<SpdyStream> new_stream(
747       new SpdyStream(request.type(), GetWeakPtr(), request.url(),
748                      request.priority(),
749                      stream_initial_send_window_size_,
750                      stream_initial_recv_window_size_,
751                      request.net_log()));
752   *stream = new_stream->GetWeakPtr();
753   InsertCreatedStream(new_stream.Pass());
754
755   UMA_HISTOGRAM_CUSTOM_COUNTS(
756       "Net.SpdyPriorityCount",
757       static_cast<int>(request.priority()), 0, 10, 11);
758
759   return OK;
760 }
761
762 void SpdySession::CancelStreamRequest(
763     const base::WeakPtr<SpdyStreamRequest>& request) {
764   DCHECK(request);
765   RequestPriority priority = request->priority();
766   CHECK_GE(priority, MINIMUM_PRIORITY);
767   CHECK_LE(priority, MAXIMUM_PRIORITY);
768
769 #if DCHECK_IS_ON
770   // |request| should not be in a queue not matching its priority.
771   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
772     if (priority == i)
773       continue;
774     PendingStreamRequestQueue* queue = &pending_create_stream_queues_[i];
775     DCHECK(std::find_if(queue->begin(),
776                         queue->end(),
777                         RequestEquals(request)) == queue->end());
778   }
779 #endif
780
781   PendingStreamRequestQueue* queue =
782       &pending_create_stream_queues_[priority];
783   // Remove |request| from |queue| while preserving the order of the
784   // other elements.
785   PendingStreamRequestQueue::iterator it =
786       std::find_if(queue->begin(), queue->end(), RequestEquals(request));
787   // The request may already be removed if there's a
788   // CompleteStreamRequest() in flight.
789   if (it != queue->end()) {
790     it = queue->erase(it);
791     // |request| should be in the queue at most once, and if it is
792     // present, should not be pending completion.
793     DCHECK(std::find_if(it, queue->end(), RequestEquals(request)) ==
794            queue->end());
795   }
796 }
797
798 base::WeakPtr<SpdyStreamRequest> SpdySession::GetNextPendingStreamRequest() {
799   for (int j = MAXIMUM_PRIORITY; j >= MINIMUM_PRIORITY; --j) {
800     if (pending_create_stream_queues_[j].empty())
801       continue;
802
803     base::WeakPtr<SpdyStreamRequest> pending_request =
804         pending_create_stream_queues_[j].front();
805     DCHECK(pending_request);
806     pending_create_stream_queues_[j].pop_front();
807     return pending_request;
808   }
809   return base::WeakPtr<SpdyStreamRequest>();
810 }
811
812 void SpdySession::ProcessPendingStreamRequests() {
813   // Like |max_concurrent_streams_|, 0 means infinite for
814   // |max_requests_to_process|.
815   size_t max_requests_to_process = 0;
816   if (max_concurrent_streams_ != 0) {
817     max_requests_to_process =
818         max_concurrent_streams_ -
819         (active_streams_.size() + created_streams_.size());
820   }
821   for (size_t i = 0;
822        max_requests_to_process == 0 || i < max_requests_to_process; ++i) {
823     base::WeakPtr<SpdyStreamRequest> pending_request =
824         GetNextPendingStreamRequest();
825     if (!pending_request)
826       break;
827
828     base::MessageLoop::current()->PostTask(
829         FROM_HERE,
830         base::Bind(&SpdySession::CompleteStreamRequest,
831                    weak_factory_.GetWeakPtr(),
832                    pending_request));
833   }
834 }
835
836 void SpdySession::AddPooledAlias(const SpdySessionKey& alias_key) {
837   pooled_aliases_.insert(alias_key);
838 }
839
840 SpdyMajorVersion SpdySession::GetProtocolVersion() const {
841   DCHECK(buffered_spdy_framer_.get());
842   return buffered_spdy_framer_->protocol_version();
843 }
844
845 base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() {
846   return weak_factory_.GetWeakPtr();
847 }
848
849 bool SpdySession::CloseOneIdleConnection() {
850   CHECK(!in_io_loop_);
851   DCHECK_NE(availability_state_, STATE_CLOSED);
852   DCHECK(pool_);
853   if (!active_streams_.empty())
854     return false;
855   CloseSessionResult result =
856       DoCloseSession(ERR_CONNECTION_CLOSED, "Closing one idle connection.");
857   if (result != SESSION_CLOSED_AND_REMOVED) {
858     NOTREACHED();
859     return false;
860   }
861   return true;
862 }
863
864 void SpdySession::EnqueueStreamWrite(
865     const base::WeakPtr<SpdyStream>& stream,
866     SpdyFrameType frame_type,
867     scoped_ptr<SpdyBufferProducer> producer) {
868   DCHECK(frame_type == HEADERS ||
869          frame_type == DATA ||
870          frame_type == CREDENTIAL ||
871          frame_type == SYN_STREAM);
872   EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream);
873 }
874
875 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream(
876     SpdyStreamId stream_id,
877     RequestPriority priority,
878     SpdyControlFlags flags,
879     const SpdyHeaderBlock& headers) {
880   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
881   CHECK(it != active_streams_.end());
882   CHECK_EQ(it->second.stream->stream_id(), stream_id);
883
884   SendPrefacePingIfNoneInFlight();
885
886   DCHECK(buffered_spdy_framer_.get());
887   SpdyPriority spdy_priority =
888       ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion());
889   scoped_ptr<SpdyFrame> syn_frame(
890       buffered_spdy_framer_->CreateSynStream(stream_id, 0, spdy_priority, flags,
891                                              &headers));
892
893   base::StatsCounter spdy_requests("spdy.requests");
894   spdy_requests.Increment();
895   streams_initiated_count_++;
896
897   if (net_log().IsLogging()) {
898     net_log().AddEvent(
899         NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
900         base::Bind(&NetLogSpdySynStreamSentCallback, &headers,
901                    (flags & CONTROL_FLAG_FIN) != 0,
902                    (flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0,
903                    spdy_priority,
904                    stream_id));
905   }
906
907   return syn_frame.Pass();
908 }
909
910 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
911                                                      IOBuffer* data,
912                                                      int len,
913                                                      SpdyDataFlags flags) {
914   if (availability_state_ == STATE_CLOSED) {
915     NOTREACHED();
916     return scoped_ptr<SpdyBuffer>();
917   }
918
919   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
920   CHECK(it != active_streams_.end());
921   SpdyStream* stream = it->second.stream;
922   CHECK_EQ(stream->stream_id(), stream_id);
923
924   if (len < 0) {
925     NOTREACHED();
926     return scoped_ptr<SpdyBuffer>();
927   }
928
929   int effective_len = std::min(len, kMaxSpdyFrameChunkSize);
930
931   bool send_stalled_by_stream =
932       (flow_control_state_ >= FLOW_CONTROL_STREAM) &&
933       (stream->send_window_size() <= 0);
934   bool send_stalled_by_session = IsSendStalled();
935
936   // NOTE: There's an enum of the same name in histograms.xml.
937   enum SpdyFrameFlowControlState {
938     SEND_NOT_STALLED,
939     SEND_STALLED_BY_STREAM,
940     SEND_STALLED_BY_SESSION,
941     SEND_STALLED_BY_STREAM_AND_SESSION,
942   };
943
944   SpdyFrameFlowControlState frame_flow_control_state = SEND_NOT_STALLED;
945   if (send_stalled_by_stream) {
946     if (send_stalled_by_session) {
947       frame_flow_control_state = SEND_STALLED_BY_STREAM_AND_SESSION;
948     } else {
949       frame_flow_control_state = SEND_STALLED_BY_STREAM;
950     }
951   } else if (send_stalled_by_session) {
952     frame_flow_control_state = SEND_STALLED_BY_SESSION;
953   }
954
955   if (flow_control_state_ == FLOW_CONTROL_STREAM) {
956     UMA_HISTOGRAM_ENUMERATION(
957         "Net.SpdyFrameStreamFlowControlState",
958         frame_flow_control_state,
959         SEND_STALLED_BY_STREAM + 1);
960   } else if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
961     UMA_HISTOGRAM_ENUMERATION(
962         "Net.SpdyFrameStreamAndSessionFlowControlState",
963         frame_flow_control_state,
964         SEND_STALLED_BY_STREAM_AND_SESSION + 1);
965   }
966
967   // Obey send window size of the stream if stream flow control is
968   // enabled.
969   if (flow_control_state_ >= FLOW_CONTROL_STREAM) {
970     if (send_stalled_by_stream) {
971       stream->set_send_stalled_by_flow_control(true);
972       // Even though we're currently stalled only by the stream, we
973       // might end up being stalled by the session also.
974       QueueSendStalledStream(*stream);
975       net_log().AddEvent(
976           NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_STREAM_SEND_WINDOW,
977           NetLog::IntegerCallback("stream_id", stream_id));
978       return scoped_ptr<SpdyBuffer>();
979     }
980
981     effective_len = std::min(effective_len, stream->send_window_size());
982   }
983
984   // Obey send window size of the session if session flow control is
985   // enabled.
986   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
987     if (send_stalled_by_session) {
988       stream->set_send_stalled_by_flow_control(true);
989       QueueSendStalledStream(*stream);
990       net_log().AddEvent(
991           NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_SESSION_SEND_WINDOW,
992           NetLog::IntegerCallback("stream_id", stream_id));
993       return scoped_ptr<SpdyBuffer>();
994     }
995
996     effective_len = std::min(effective_len, session_send_window_size_);
997   }
998
999   DCHECK_GE(effective_len, 0);
1000
1001   // Clear FIN flag if only some of the data will be in the data
1002   // frame.
1003   if (effective_len < len)
1004     flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
1005
1006   if (net_log().IsLogging()) {
1007     net_log().AddEvent(
1008         NetLog::TYPE_SPDY_SESSION_SEND_DATA,
1009         base::Bind(&NetLogSpdyDataCallback, stream_id, effective_len,
1010                    (flags & DATA_FLAG_FIN) != 0));
1011   }
1012
1013   // Send PrefacePing for DATA_FRAMEs with nonzero payload size.
1014   if (effective_len > 0)
1015     SendPrefacePingIfNoneInFlight();
1016
1017   // TODO(mbelshe): reduce memory copies here.
1018   DCHECK(buffered_spdy_framer_.get());
1019   scoped_ptr<SpdyFrame> frame(
1020       buffered_spdy_framer_->CreateDataFrame(
1021           stream_id, data->data(),
1022           static_cast<uint32>(effective_len), flags));
1023
1024   scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass()));
1025
1026   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1027     DecreaseSendWindowSize(static_cast<int32>(effective_len));
1028     data_buffer->AddConsumeCallback(
1029         base::Bind(&SpdySession::OnWriteBufferConsumed,
1030                    weak_factory_.GetWeakPtr(),
1031                    static_cast<size_t>(effective_len)));
1032   }
1033
1034   return data_buffer.Pass();
1035 }
1036
1037 void SpdySession::CloseActiveStream(SpdyStreamId stream_id, int status) {
1038   DCHECK_NE(stream_id, 0u);
1039
1040   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1041   if (it == active_streams_.end()) {
1042     NOTREACHED();
1043     return;
1044   }
1045
1046   CloseActiveStreamIterator(it, status);
1047 }
1048
1049 void SpdySession::CloseCreatedStream(
1050     const base::WeakPtr<SpdyStream>& stream, int status) {
1051   DCHECK_EQ(stream->stream_id(), 0u);
1052
1053   CreatedStreamSet::iterator it = created_streams_.find(stream.get());
1054   if (it == created_streams_.end()) {
1055     NOTREACHED();
1056     return;
1057   }
1058
1059   CloseCreatedStreamIterator(it, status);
1060 }
1061
1062 void SpdySession::ResetStream(SpdyStreamId stream_id,
1063                               SpdyRstStreamStatus status,
1064                               const std::string& description) {
1065   DCHECK_NE(stream_id, 0u);
1066
1067   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1068   if (it == active_streams_.end()) {
1069     NOTREACHED();
1070     return;
1071   }
1072
1073   ResetStreamIterator(it, status, description);
1074 }
1075
1076 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const {
1077   return ContainsKey(active_streams_, stream_id);
1078 }
1079
1080 LoadState SpdySession::GetLoadState() const {
1081   // Just report that we're idle since the session could be doing
1082   // many things concurrently.
1083   return LOAD_STATE_IDLE;
1084 }
1085
1086 void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it,
1087                                             int status) {
1088   // TODO(mbelshe): We should send a RST_STREAM control frame here
1089   //                so that the server can cancel a large send.
1090
1091   scoped_ptr<SpdyStream> owned_stream(it->second.stream);
1092   active_streams_.erase(it);
1093
1094   // TODO(akalin): When SpdyStream was ref-counted (and
1095   // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this
1096   // was only done when status was not OK. This meant that pushed
1097   // streams can still be claimed after they're closed. This is
1098   // probably something that we still want to support, although server
1099   // push is hardly used. Write tests for this and fix this. (See
1100   // http://crbug.com/261712 .)
1101   if (owned_stream->type() == SPDY_PUSH_STREAM)
1102     unclaimed_pushed_streams_.erase(owned_stream->url());
1103
1104   base::WeakPtr<SpdySession> weak_this = GetWeakPtr();
1105
1106   DeleteStream(owned_stream.Pass(), status);
1107
1108   if (!weak_this)
1109     return;
1110
1111   if (availability_state_ == STATE_CLOSED)
1112     return;
1113
1114   // If there are no active streams and the socket pool is stalled, close the
1115   // session to free up a socket slot.
1116   if (active_streams_.empty() && connection_->IsPoolStalled()) {
1117     CloseSessionResult result =
1118         DoCloseSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
1119     CHECK_NE(result, SESSION_ALREADY_CLOSED);
1120   }
1121 }
1122
1123 void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it,
1124                                              int status) {
1125   scoped_ptr<SpdyStream> owned_stream(*it);
1126   created_streams_.erase(it);
1127   DeleteStream(owned_stream.Pass(), status);
1128 }
1129
1130 void SpdySession::ResetStreamIterator(ActiveStreamMap::iterator it,
1131                                       SpdyRstStreamStatus status,
1132                                       const std::string& description) {
1133   // Send the RST_STREAM frame first as CloseActiveStreamIterator()
1134   // may close us.
1135   SpdyStreamId stream_id = it->first;
1136   RequestPriority priority = it->second.stream->priority();
1137   EnqueueResetStreamFrame(stream_id, priority, status, description);
1138
1139   // Removes any pending writes for the stream except for possibly an
1140   // in-flight one.
1141   CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
1142 }
1143
1144 void SpdySession::EnqueueResetStreamFrame(SpdyStreamId stream_id,
1145                                           RequestPriority priority,
1146                                           SpdyRstStreamStatus status,
1147                                           const std::string& description) {
1148   DCHECK_NE(stream_id, 0u);
1149
1150   net_log().AddEvent(
1151       NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
1152       base::Bind(&NetLogSpdyRstCallback, stream_id, status, &description));
1153
1154   DCHECK(buffered_spdy_framer_.get());
1155   scoped_ptr<SpdyFrame> rst_frame(
1156       buffered_spdy_framer_->CreateRstStream(stream_id, status));
1157
1158   EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass());
1159   RecordProtocolErrorHistogram(MapRstStreamStatusToProtocolError(status));
1160 }
1161
1162 void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) {
1163   CHECK(!in_io_loop_);
1164   CHECK_NE(availability_state_, STATE_CLOSED);
1165   CHECK_EQ(read_state_, expected_read_state);
1166
1167   result = DoReadLoop(expected_read_state, result);
1168
1169   if (availability_state_ == STATE_CLOSED) {
1170     CHECK_EQ(result, error_on_close_);
1171     CHECK_LT(error_on_close_, ERR_IO_PENDING);
1172     RemoveFromPool();
1173     return;
1174   }
1175
1176   CHECK(result == OK || result == ERR_IO_PENDING);
1177 }
1178
1179 int SpdySession::DoReadLoop(ReadState expected_read_state, int result) {
1180   CHECK(!in_io_loop_);
1181   CHECK_NE(availability_state_, STATE_CLOSED);
1182   CHECK_EQ(read_state_, expected_read_state);
1183
1184   in_io_loop_ = true;
1185
1186   int bytes_read_without_yielding = 0;
1187
1188   // Loop until the session is closed, the read becomes blocked, or
1189   // the read limit is exceeded.
1190   while (true) {
1191     switch (read_state_) {
1192       case READ_STATE_DO_READ:
1193         CHECK_EQ(result, OK);
1194         result = DoRead();
1195         break;
1196       case READ_STATE_DO_READ_COMPLETE:
1197         if (result > 0)
1198           bytes_read_without_yielding += result;
1199         result = DoReadComplete(result);
1200         break;
1201       default:
1202         NOTREACHED() << "read_state_: " << read_state_;
1203         break;
1204     }
1205
1206     if (availability_state_ == STATE_CLOSED) {
1207       CHECK_EQ(result, error_on_close_);
1208       CHECK_LT(result, ERR_IO_PENDING);
1209       break;
1210     }
1211
1212     if (result == ERR_IO_PENDING)
1213       break;
1214
1215     if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) {
1216       read_state_ = READ_STATE_DO_READ;
1217       base::MessageLoop::current()->PostTask(
1218           FROM_HERE,
1219           base::Bind(&SpdySession::PumpReadLoop,
1220                      weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
1221       result = ERR_IO_PENDING;
1222       break;
1223     }
1224   }
1225
1226   CHECK(in_io_loop_);
1227   in_io_loop_ = false;
1228
1229   return result;
1230 }
1231
1232 int SpdySession::DoRead() {
1233   CHECK(in_io_loop_);
1234   CHECK_NE(availability_state_, STATE_CLOSED);
1235
1236   CHECK(connection_);
1237   CHECK(connection_->socket());
1238   read_state_ = READ_STATE_DO_READ_COMPLETE;
1239   return connection_->socket()->Read(
1240       read_buffer_.get(),
1241       kReadBufferSize,
1242       base::Bind(&SpdySession::PumpReadLoop,
1243                  weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE));
1244 }
1245
1246 int SpdySession::DoReadComplete(int result) {
1247   CHECK(in_io_loop_);
1248   DCHECK_NE(availability_state_, STATE_CLOSED);
1249
1250   // Parse a frame.  For now this code requires that the frame fit into our
1251   // buffer (kReadBufferSize).
1252   // TODO(mbelshe): support arbitrarily large frames!
1253
1254   if (result == 0) {
1255     UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF",
1256                                 total_bytes_received_, 1, 100000000, 50);
1257     CloseSessionResult close_session_result =
1258         DoCloseSession(ERR_CONNECTION_CLOSED, "Connection closed");
1259     DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED);
1260     DCHECK_EQ(availability_state_, STATE_CLOSED);
1261     DCHECK_EQ(error_on_close_, ERR_CONNECTION_CLOSED);
1262     return ERR_CONNECTION_CLOSED;
1263   }
1264
1265   if (result < 0) {
1266     CloseSessionResult close_session_result =
1267         DoCloseSession(static_cast<Error>(result), "result is < 0.");
1268     DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED);
1269     DCHECK_EQ(availability_state_, STATE_CLOSED);
1270     DCHECK_EQ(error_on_close_, result);
1271     return result;
1272   }
1273   CHECK_LE(result, kReadBufferSize);
1274   total_bytes_received_ += result;
1275
1276   last_activity_time_ = time_func_();
1277
1278   DCHECK(buffered_spdy_framer_.get());
1279   char* data = read_buffer_->data();
1280   while (result > 0) {
1281     uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result);
1282     result -= bytes_processed;
1283     data += bytes_processed;
1284
1285     if (availability_state_ == STATE_CLOSED) {
1286       DCHECK_LT(error_on_close_, ERR_IO_PENDING);
1287       return error_on_close_;
1288     }
1289
1290     DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR);
1291   }
1292
1293   read_state_ = READ_STATE_DO_READ;
1294   return OK;
1295 }
1296
1297 void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) {
1298   CHECK(!in_io_loop_);
1299   DCHECK_NE(availability_state_, STATE_CLOSED);
1300   DCHECK_EQ(write_state_, expected_write_state);
1301
1302   result = DoWriteLoop(expected_write_state, result);
1303
1304   if (availability_state_ == STATE_CLOSED) {
1305     DCHECK_EQ(result, error_on_close_);
1306     DCHECK_LT(error_on_close_, ERR_IO_PENDING);
1307     RemoveFromPool();
1308     return;
1309   }
1310
1311   DCHECK(result == OK || result == ERR_IO_PENDING);
1312 }
1313
1314 int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) {
1315   CHECK(!in_io_loop_);
1316   DCHECK_NE(availability_state_, STATE_CLOSED);
1317   DCHECK_NE(write_state_, WRITE_STATE_IDLE);
1318   DCHECK_EQ(write_state_, expected_write_state);
1319
1320   in_io_loop_ = true;
1321
1322   // Loop until the session is closed or the write becomes blocked.
1323   while (true) {
1324     switch (write_state_) {
1325       case WRITE_STATE_DO_WRITE:
1326         DCHECK_EQ(result, OK);
1327         result = DoWrite();
1328         break;
1329       case WRITE_STATE_DO_WRITE_COMPLETE:
1330         result = DoWriteComplete(result);
1331         break;
1332       case WRITE_STATE_IDLE:
1333       default:
1334         NOTREACHED() << "write_state_: " << write_state_;
1335         break;
1336     }
1337
1338     if (availability_state_ == STATE_CLOSED) {
1339       DCHECK_EQ(result, error_on_close_);
1340       DCHECK_LT(result, ERR_IO_PENDING);
1341       break;
1342     }
1343
1344     if (write_state_ == WRITE_STATE_IDLE) {
1345       DCHECK_EQ(result, ERR_IO_PENDING);
1346       break;
1347     }
1348
1349     if (result == ERR_IO_PENDING)
1350       break;
1351   }
1352
1353   CHECK(in_io_loop_);
1354   in_io_loop_ = false;
1355
1356   return result;
1357 }
1358
1359 int SpdySession::DoWrite() {
1360   CHECK(in_io_loop_);
1361   DCHECK_NE(availability_state_, STATE_CLOSED);
1362
1363   DCHECK(buffered_spdy_framer_);
1364   if (in_flight_write_) {
1365     DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1366   } else {
1367     // Grab the next frame to send.
1368     SpdyFrameType frame_type = DATA;
1369     scoped_ptr<SpdyBufferProducer> producer;
1370     base::WeakPtr<SpdyStream> stream;
1371     if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) {
1372       write_state_ = WRITE_STATE_IDLE;
1373       return ERR_IO_PENDING;
1374     }
1375
1376     if (stream.get())
1377       CHECK(!stream->IsClosed());
1378
1379     // Activate the stream only when sending the SYN_STREAM frame to
1380     // guarantee monotonically-increasing stream IDs.
1381     if (frame_type == SYN_STREAM) {
1382       CHECK(stream.get());
1383       CHECK_EQ(stream->stream_id(), 0u);
1384       scoped_ptr<SpdyStream> owned_stream =
1385           ActivateCreatedStream(stream.get());
1386       InsertActivatedStream(owned_stream.Pass());
1387     }
1388
1389     in_flight_write_ = producer->ProduceBuffer();
1390     if (!in_flight_write_) {
1391       NOTREACHED();
1392       return ERR_UNEXPECTED;
1393     }
1394     in_flight_write_frame_type_ = frame_type;
1395     in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
1396     DCHECK_GE(in_flight_write_frame_size_,
1397               buffered_spdy_framer_->GetFrameMinimumSize());
1398     in_flight_write_stream_ = stream;
1399   }
1400
1401   write_state_ = WRITE_STATE_DO_WRITE_COMPLETE;
1402
1403   // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems
1404   // with Socket implementations that don't store their IOBuffer
1405   // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345).
1406   scoped_refptr<IOBuffer> write_io_buffer =
1407       in_flight_write_->GetIOBufferForRemainingData();
1408   return connection_->socket()->Write(
1409       write_io_buffer.get(),
1410       in_flight_write_->GetRemainingSize(),
1411       base::Bind(&SpdySession::PumpWriteLoop,
1412                  weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE));
1413 }
1414
1415 int SpdySession::DoWriteComplete(int result) {
1416   CHECK(in_io_loop_);
1417   DCHECK_NE(availability_state_, STATE_CLOSED);
1418   DCHECK_NE(result, ERR_IO_PENDING);
1419   DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1420
1421   last_activity_time_ = time_func_();
1422
1423   if (result < 0) {
1424     DCHECK_NE(result, ERR_IO_PENDING);
1425     in_flight_write_.reset();
1426     in_flight_write_frame_type_ = DATA;
1427     in_flight_write_frame_size_ = 0;
1428     in_flight_write_stream_.reset();
1429     CloseSessionResult close_session_result =
1430         DoCloseSession(static_cast<Error>(result), "Write error");
1431     DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED);
1432     DCHECK_EQ(availability_state_, STATE_CLOSED);
1433     DCHECK_EQ(error_on_close_, result);
1434     return result;
1435   }
1436
1437   // It should not be possible to have written more bytes than our
1438   // in_flight_write_.
1439   DCHECK_LE(static_cast<size_t>(result),
1440             in_flight_write_->GetRemainingSize());
1441
1442   if (result > 0) {
1443     in_flight_write_->Consume(static_cast<size_t>(result));
1444
1445     // We only notify the stream when we've fully written the pending frame.
1446     if (in_flight_write_->GetRemainingSize() == 0) {
1447       // It is possible that the stream was cancelled while we were
1448       // writing to the socket.
1449       if (in_flight_write_stream_.get()) {
1450         DCHECK_GT(in_flight_write_frame_size_, 0u);
1451         in_flight_write_stream_->OnFrameWriteComplete(
1452             in_flight_write_frame_type_,
1453             in_flight_write_frame_size_);
1454       }
1455
1456       // Cleanup the write which just completed.
1457       in_flight_write_.reset();
1458       in_flight_write_frame_type_ = DATA;
1459       in_flight_write_frame_size_ = 0;
1460       in_flight_write_stream_.reset();
1461     }
1462   }
1463
1464   write_state_ = WRITE_STATE_DO_WRITE;
1465   return OK;
1466 }
1467
1468 void SpdySession::DcheckGoingAway() const {
1469 #if DCHECK_IS_ON
1470   DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1471   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
1472     DCHECK(pending_create_stream_queues_[i].empty());
1473   }
1474   DCHECK(created_streams_.empty());
1475 #endif
1476 }
1477
1478 void SpdySession::DcheckClosed() const {
1479   DcheckGoingAway();
1480   DCHECK_EQ(availability_state_, STATE_CLOSED);
1481   DCHECK_LT(error_on_close_, ERR_IO_PENDING);
1482   DCHECK(active_streams_.empty());
1483   DCHECK(unclaimed_pushed_streams_.empty());
1484   DCHECK(write_queue_.IsEmpty());
1485 }
1486
1487 void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id,
1488                                  Error status) {
1489   DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1490
1491   // The loops below are carefully written to avoid reentrancy problems.
1492
1493   while (true) {
1494     size_t old_size = GetTotalSize(pending_create_stream_queues_);
1495     base::WeakPtr<SpdyStreamRequest> pending_request =
1496         GetNextPendingStreamRequest();
1497     if (!pending_request)
1498       break;
1499     // No new stream requests should be added while the session is
1500     // going away.
1501     DCHECK_GT(old_size, GetTotalSize(pending_create_stream_queues_));
1502     pending_request->OnRequestCompleteFailure(ERR_ABORTED);
1503   }
1504
1505   while (true) {
1506     size_t old_size = active_streams_.size();
1507     ActiveStreamMap::iterator it =
1508         active_streams_.lower_bound(last_good_stream_id + 1);
1509     if (it == active_streams_.end())
1510       break;
1511     LogAbandonedActiveStream(it, status);
1512     CloseActiveStreamIterator(it, status);
1513     // No new streams should be activated while the session is going
1514     // away.
1515     DCHECK_GT(old_size, active_streams_.size());
1516   }
1517
1518   while (!created_streams_.empty()) {
1519     size_t old_size = created_streams_.size();
1520     CreatedStreamSet::iterator it = created_streams_.begin();
1521     LogAbandonedStream(*it, status);
1522     CloseCreatedStreamIterator(it, status);
1523     // No new streams should be created while the session is going
1524     // away.
1525     DCHECK_GT(old_size, created_streams_.size());
1526   }
1527
1528   write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id);
1529
1530   DcheckGoingAway();
1531 }
1532
1533 void SpdySession::MaybeFinishGoingAway() {
1534   DcheckGoingAway();
1535   if (active_streams_.empty() && availability_state_ != STATE_CLOSED) {
1536     CloseSessionResult result =
1537         DoCloseSession(ERR_CONNECTION_CLOSED, "Finished going away");
1538     CHECK_NE(result, SESSION_ALREADY_CLOSED);
1539   }
1540 }
1541
1542 SpdySession::CloseSessionResult SpdySession::DoCloseSession(
1543     Error err,
1544     const std::string& description) {
1545   CHECK_LT(err, ERR_IO_PENDING);
1546
1547   if (availability_state_ == STATE_CLOSED)
1548     return SESSION_ALREADY_CLOSED;
1549
1550   net_log_.AddEvent(
1551       NetLog::TYPE_SPDY_SESSION_CLOSE,
1552       base::Bind(&NetLogSpdySessionCloseCallback, err, &description));
1553
1554   UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err);
1555   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors",
1556                               total_bytes_received_, 1, 100000000, 50);
1557
1558   CHECK(pool_);
1559   if (availability_state_ != STATE_GOING_AWAY)
1560     pool_->MakeSessionUnavailable(GetWeakPtr());
1561
1562   availability_state_ = STATE_CLOSED;
1563   error_on_close_ = err;
1564
1565   StartGoingAway(0, err);
1566   write_queue_.Clear();
1567
1568   DcheckClosed();
1569
1570   if (in_io_loop_)
1571     return SESSION_CLOSED_BUT_NOT_REMOVED;
1572
1573   RemoveFromPool();
1574   return SESSION_CLOSED_AND_REMOVED;
1575 }
1576
1577 void SpdySession::RemoveFromPool() {
1578   DcheckClosed();
1579   CHECK(pool_);
1580
1581   SpdySessionPool* pool = pool_;
1582   pool_ = NULL;
1583   pool->RemoveUnavailableSession(GetWeakPtr());
1584 }
1585
1586 void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) {
1587   DCHECK(stream);
1588   std::string description = base::StringPrintf(
1589       "ABANDONED (stream_id=%d): ", stream->stream_id()) +
1590       stream->url().spec();
1591   stream->LogStreamError(status, description);
1592   // We don't increment the streams abandoned counter here. If the
1593   // stream isn't active (i.e., it hasn't written anything to the wire
1594   // yet) then it's as if it never existed. If it is active, then
1595   // LogAbandonedActiveStream() will increment the counters.
1596 }
1597
1598 void SpdySession::LogAbandonedActiveStream(ActiveStreamMap::const_iterator it,
1599                                            Error status) {
1600   DCHECK_GT(it->first, 0u);
1601   LogAbandonedStream(it->second.stream, status);
1602   ++streams_abandoned_count_;
1603   base::StatsCounter abandoned_streams("spdy.abandoned_streams");
1604   abandoned_streams.Increment();
1605   if (it->second.stream->type() == SPDY_PUSH_STREAM &&
1606       unclaimed_pushed_streams_.find(it->second.stream->url()) !=
1607       unclaimed_pushed_streams_.end()) {
1608     base::StatsCounter abandoned_push_streams("spdy.abandoned_push_streams");
1609     abandoned_push_streams.Increment();
1610   }
1611 }
1612
1613 int SpdySession::GetNewStreamId() {
1614   int id = stream_hi_water_mark_;
1615   stream_hi_water_mark_ += 2;
1616   if (stream_hi_water_mark_ > 0x7fff)
1617     stream_hi_water_mark_ = 1;
1618   return id;
1619 }
1620
1621 void SpdySession::CloseSessionOnError(Error err,
1622                                       const std::string& description) {
1623   // We may be called from anywhere, so we can't expect a particular
1624   // return value.
1625   ignore_result(DoCloseSession(err, description));
1626 }
1627
1628 void SpdySession::MakeUnavailable() {
1629   if (availability_state_ < STATE_GOING_AWAY) {
1630     availability_state_ = STATE_GOING_AWAY;
1631     DCHECK(pool_);
1632     pool_->MakeSessionUnavailable(GetWeakPtr());
1633   }
1634 }
1635
1636 base::Value* SpdySession::GetInfoAsValue() const {
1637   base::DictionaryValue* dict = new base::DictionaryValue();
1638
1639   dict->SetInteger("source_id", net_log_.source().id);
1640
1641   dict->SetString("host_port_pair", host_port_pair().ToString());
1642   if (!pooled_aliases_.empty()) {
1643     base::ListValue* alias_list = new base::ListValue();
1644     for (std::set<SpdySessionKey>::const_iterator it =
1645              pooled_aliases_.begin();
1646          it != pooled_aliases_.end(); it++) {
1647       alias_list->Append(new base::StringValue(
1648           it->host_port_pair().ToString()));
1649     }
1650     dict->Set("aliases", alias_list);
1651   }
1652   dict->SetString("proxy", host_port_proxy_pair().second.ToURI());
1653
1654   dict->SetInteger("active_streams", active_streams_.size());
1655
1656   dict->SetInteger("unclaimed_pushed_streams",
1657                    unclaimed_pushed_streams_.size());
1658
1659   dict->SetBoolean("is_secure", is_secure_);
1660
1661   dict->SetString("protocol_negotiated",
1662                   SSLClientSocket::NextProtoToString(
1663                       connection_->socket()->GetNegotiatedProtocol()));
1664
1665   dict->SetInteger("error", error_on_close_);
1666   dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
1667
1668   dict->SetInteger("streams_initiated_count", streams_initiated_count_);
1669   dict->SetInteger("streams_pushed_count", streams_pushed_count_);
1670   dict->SetInteger("streams_pushed_and_claimed_count",
1671       streams_pushed_and_claimed_count_);
1672   dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
1673   DCHECK(buffered_spdy_framer_.get());
1674   dict->SetInteger("frames_received", buffered_spdy_framer_->frames_received());
1675
1676   dict->SetBoolean("sent_settings", sent_settings_);
1677   dict->SetBoolean("received_settings", received_settings_);
1678
1679   dict->SetInteger("send_window_size", session_send_window_size_);
1680   dict->SetInteger("recv_window_size", session_recv_window_size_);
1681   dict->SetInteger("unacked_recv_window_bytes",
1682                    session_unacked_recv_window_bytes_);
1683   return dict;
1684 }
1685
1686 bool SpdySession::IsReused() const {
1687   return buffered_spdy_framer_->frames_received() > 0 ||
1688       connection_->reuse_type() == ClientSocketHandle::UNUSED_IDLE;
1689 }
1690
1691 bool SpdySession::GetLoadTimingInfo(SpdyStreamId stream_id,
1692                                     LoadTimingInfo* load_timing_info) const {
1693   return connection_->GetLoadTimingInfo(stream_id != kFirstStreamId,
1694                                         load_timing_info);
1695 }
1696
1697 int SpdySession::GetPeerAddress(IPEndPoint* address) const {
1698   int rv = ERR_SOCKET_NOT_CONNECTED;
1699   if (connection_->socket()) {
1700     rv = connection_->socket()->GetPeerAddress(address);
1701   }
1702
1703   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetPeerAddress",
1704                         rv == ERR_SOCKET_NOT_CONNECTED);
1705
1706   return rv;
1707 }
1708
1709 int SpdySession::GetLocalAddress(IPEndPoint* address) const {
1710   int rv = ERR_SOCKET_NOT_CONNECTED;
1711   if (connection_->socket()) {
1712     rv = connection_->socket()->GetLocalAddress(address);
1713   }
1714
1715   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetLocalAddress",
1716                         rv == ERR_SOCKET_NOT_CONNECTED);
1717
1718   return rv;
1719 }
1720
1721 void SpdySession::EnqueueSessionWrite(RequestPriority priority,
1722                                       SpdyFrameType frame_type,
1723                                       scoped_ptr<SpdyFrame> frame) {
1724   DCHECK(frame_type == RST_STREAM ||
1725          frame_type == SETTINGS ||
1726          frame_type == WINDOW_UPDATE ||
1727          frame_type == PING);
1728   EnqueueWrite(
1729       priority, frame_type,
1730       scoped_ptr<SpdyBufferProducer>(
1731           new SimpleBufferProducer(
1732               scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
1733       base::WeakPtr<SpdyStream>());
1734 }
1735
1736 void SpdySession::EnqueueWrite(RequestPriority priority,
1737                                SpdyFrameType frame_type,
1738                                scoped_ptr<SpdyBufferProducer> producer,
1739                                const base::WeakPtr<SpdyStream>& stream) {
1740   if (availability_state_ == STATE_CLOSED)
1741     return;
1742
1743   bool was_idle = write_queue_.IsEmpty();
1744   write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
1745   if (write_state_ == WRITE_STATE_IDLE) {
1746     DCHECK(was_idle);
1747     DCHECK(!in_flight_write_);
1748     write_state_ = WRITE_STATE_DO_WRITE;
1749     base::MessageLoop::current()->PostTask(
1750         FROM_HERE,
1751         base::Bind(&SpdySession::PumpWriteLoop,
1752                    weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK));
1753   }
1754 }
1755
1756 void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) {
1757   CHECK_EQ(stream->stream_id(), 0u);
1758   CHECK(created_streams_.find(stream.get()) == created_streams_.end());
1759   created_streams_.insert(stream.release());
1760 }
1761
1762 scoped_ptr<SpdyStream> SpdySession::ActivateCreatedStream(SpdyStream* stream) {
1763   CHECK_EQ(stream->stream_id(), 0u);
1764   CHECK(created_streams_.find(stream) != created_streams_.end());
1765   stream->set_stream_id(GetNewStreamId());
1766   scoped_ptr<SpdyStream> owned_stream(stream);
1767   created_streams_.erase(stream);
1768   return owned_stream.Pass();
1769 }
1770
1771 void SpdySession::InsertActivatedStream(scoped_ptr<SpdyStream> stream) {
1772   SpdyStreamId stream_id = stream->stream_id();
1773   CHECK_NE(stream_id, 0u);
1774   std::pair<ActiveStreamMap::iterator, bool> result =
1775       active_streams_.insert(
1776           std::make_pair(stream_id, ActiveStreamInfo(stream.get())));
1777   CHECK(result.second);
1778   ignore_result(stream.release());
1779 }
1780
1781 void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) {
1782   if (in_flight_write_stream_.get() == stream.get()) {
1783     // If we're deleting the stream for the in-flight write, we still
1784     // need to let the write complete, so we clear
1785     // |in_flight_write_stream_| and let the write finish on its own
1786     // without notifying |in_flight_write_stream_|.
1787     in_flight_write_stream_.reset();
1788   }
1789
1790   write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr());
1791
1792   // |stream->OnClose()| may end up closing |this|, so detect that.
1793   base::WeakPtr<SpdySession> weak_this = GetWeakPtr();
1794
1795   stream->OnClose(status);
1796
1797   if (!weak_this)
1798     return;
1799
1800   switch (availability_state_) {
1801     case STATE_AVAILABLE:
1802       ProcessPendingStreamRequests();
1803       break;
1804     case STATE_GOING_AWAY:
1805       DcheckGoingAway();
1806       MaybeFinishGoingAway();
1807       break;
1808     case STATE_CLOSED:
1809       // Do nothing.
1810       break;
1811   }
1812 }
1813
1814 base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) {
1815   base::StatsCounter used_push_streams("spdy.claimed_push_streams");
1816
1817   PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url);
1818   if (unclaimed_it == unclaimed_pushed_streams_.end())
1819     return base::WeakPtr<SpdyStream>();
1820
1821   SpdyStreamId stream_id = unclaimed_it->second.stream_id;
1822   unclaimed_pushed_streams_.erase(unclaimed_it);
1823
1824   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
1825   if (active_it == active_streams_.end()) {
1826     NOTREACHED();
1827     return base::WeakPtr<SpdyStream>();
1828   }
1829
1830   net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM);
1831   used_push_streams.Increment();
1832   return active_it->second.stream->GetWeakPtr();
1833 }
1834
1835 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info,
1836                              bool* was_npn_negotiated,
1837                              NextProto* protocol_negotiated) {
1838   *was_npn_negotiated = connection_->socket()->WasNpnNegotiated();
1839   *protocol_negotiated = connection_->socket()->GetNegotiatedProtocol();
1840   return connection_->socket()->GetSSLInfo(ssl_info);
1841 }
1842
1843 bool SpdySession::GetSSLCertRequestInfo(
1844     SSLCertRequestInfo* cert_request_info) {
1845   if (!is_secure_)
1846     return false;
1847   GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info);
1848   return true;
1849 }
1850
1851 void SpdySession::OnError(SpdyFramer::SpdyError error_code) {
1852   CHECK(in_io_loop_);
1853
1854   if (availability_state_ == STATE_CLOSED)
1855     return;
1856
1857   RecordProtocolErrorHistogram(MapFramerErrorToProtocolError(error_code));
1858   std::string description = base::StringPrintf(
1859       "SPDY_ERROR error_code: %d.", error_code);
1860   CloseSessionResult result =
1861       DoCloseSession(ERR_SPDY_PROTOCOL_ERROR, description);
1862   DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
1863 }
1864
1865 void SpdySession::OnStreamError(SpdyStreamId stream_id,
1866                                 const std::string& description) {
1867   CHECK(in_io_loop_);
1868
1869   if (availability_state_ == STATE_CLOSED)
1870     return;
1871
1872   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1873   if (it == active_streams_.end()) {
1874     // We still want to send a frame to reset the stream even if we
1875     // don't know anything about it.
1876     EnqueueResetStreamFrame(
1877         stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description);
1878     return;
1879   }
1880
1881   ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description);
1882 }
1883
1884 void SpdySession::OnDataFrameHeader(SpdyStreamId stream_id,
1885                                     size_t length,
1886                                     bool fin) {
1887   CHECK(in_io_loop_);
1888
1889   if (availability_state_ == STATE_CLOSED)
1890     return;
1891
1892   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1893
1894   // By the time data comes in, the stream may already be inactive.
1895   if (it == active_streams_.end())
1896     return;
1897
1898   SpdyStream* stream = it->second.stream;
1899   CHECK_EQ(stream->stream_id(), stream_id);
1900
1901   DCHECK(buffered_spdy_framer_);
1902   size_t header_len = buffered_spdy_framer_->GetDataFrameMinimumSize();
1903   stream->IncrementRawReceivedBytes(header_len);
1904 }
1905
1906 void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
1907                                     const char* data,
1908                                     size_t len,
1909                                     bool fin) {
1910   CHECK(in_io_loop_);
1911
1912   if (availability_state_ == STATE_CLOSED)
1913     return;
1914
1915   if (data == NULL && len != 0) {
1916     // This is notification of consumed data padding.
1917     // TODO(jgraettinger): Properly flow padding into WINDOW_UPDATE frames.
1918     // See crbug.com/353012.
1919     return;
1920   }
1921
1922   DCHECK_LT(len, 1u << 24);
1923   if (net_log().IsLogging()) {
1924     net_log().AddEvent(
1925         NetLog::TYPE_SPDY_SESSION_RECV_DATA,
1926         base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin));
1927   }
1928
1929   // Build the buffer as early as possible so that we go through the
1930   // session flow control checks and update
1931   // |unacked_recv_window_bytes_| properly even when the stream is
1932   // inactive (since the other side has still reduced its session send
1933   // window).
1934   scoped_ptr<SpdyBuffer> buffer;
1935   if (data) {
1936     DCHECK_GT(len, 0u);
1937     CHECK_LE(len, static_cast<size_t>(kReadBufferSize));
1938     buffer.reset(new SpdyBuffer(data, len));
1939
1940     if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1941       DecreaseRecvWindowSize(static_cast<int32>(len));
1942       buffer->AddConsumeCallback(
1943           base::Bind(&SpdySession::OnReadBufferConsumed,
1944                      weak_factory_.GetWeakPtr()));
1945     }
1946   } else {
1947     DCHECK_EQ(len, 0u);
1948   }
1949
1950   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1951
1952   // By the time data comes in, the stream may already be inactive.
1953   if (it == active_streams_.end())
1954     return;
1955
1956   SpdyStream* stream = it->second.stream;
1957   CHECK_EQ(stream->stream_id(), stream_id);
1958
1959   stream->IncrementRawReceivedBytes(len);
1960
1961   if (it->second.waiting_for_syn_reply) {
1962     const std::string& error = "Data received before SYN_REPLY.";
1963     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
1964     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
1965     return;
1966   }
1967
1968   stream->OnDataReceived(buffer.Pass());
1969 }
1970
1971 void SpdySession::OnSettings(bool clear_persisted) {
1972   CHECK(in_io_loop_);
1973
1974   if (availability_state_ == STATE_CLOSED)
1975     return;
1976
1977   if (clear_persisted)
1978     http_server_properties_->ClearSpdySettings(host_port_pair());
1979
1980   if (net_log_.IsLogging()) {
1981     net_log_.AddEvent(
1982         NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
1983         base::Bind(&NetLogSpdySettingsCallback, host_port_pair(),
1984                    clear_persisted));
1985   }
1986 }
1987
1988 void SpdySession::OnSetting(SpdySettingsIds id,
1989                             uint8 flags,
1990                             uint32 value) {
1991   CHECK(in_io_loop_);
1992
1993   if (availability_state_ == STATE_CLOSED)
1994     return;
1995
1996   HandleSetting(id, value);
1997   http_server_properties_->SetSpdySetting(
1998       host_port_pair(),
1999       id,
2000       static_cast<SpdySettingsFlags>(flags),
2001       value);
2002   received_settings_ = true;
2003
2004   // Log the setting.
2005   net_log_.AddEvent(
2006       NetLog::TYPE_SPDY_SESSION_RECV_SETTING,
2007       base::Bind(&NetLogSpdySettingCallback,
2008                  id, static_cast<SpdySettingsFlags>(flags), value));
2009 }
2010
2011 void SpdySession::OnSendCompressedFrame(
2012     SpdyStreamId stream_id,
2013     SpdyFrameType type,
2014     size_t payload_len,
2015     size_t frame_len) {
2016   if (type != SYN_STREAM)
2017     return;
2018
2019   DCHECK(buffered_spdy_framer_.get());
2020   size_t compressed_len =
2021       frame_len - buffered_spdy_framer_->GetSynStreamMinimumSize();
2022
2023   if (payload_len) {
2024     // Make sure we avoid early decimal truncation.
2025     int compression_pct = 100 - (100 * compressed_len) / payload_len;
2026     UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage",
2027                              compression_pct);
2028   }
2029 }
2030
2031 void SpdySession::OnReceiveCompressedFrame(
2032     SpdyStreamId stream_id,
2033     SpdyFrameType type,
2034     size_t frame_len) {
2035   last_compressed_frame_len_ = frame_len;
2036 }
2037
2038 int SpdySession::OnInitialResponseHeadersReceived(
2039     const SpdyHeaderBlock& response_headers,
2040     base::Time response_time,
2041     base::TimeTicks recv_first_byte_time,
2042     SpdyStream* stream) {
2043   CHECK(in_io_loop_);
2044   SpdyStreamId stream_id = stream->stream_id();
2045   // May invalidate |stream|.
2046   int rv = stream->OnInitialResponseHeadersReceived(
2047       response_headers, response_time, recv_first_byte_time);
2048   if (rv < 0) {
2049     DCHECK_NE(rv, ERR_IO_PENDING);
2050     DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2051   }
2052   return rv;
2053 }
2054
2055 void SpdySession::OnSynStream(SpdyStreamId stream_id,
2056                               SpdyStreamId associated_stream_id,
2057                               SpdyPriority priority,
2058                               bool fin,
2059                               bool unidirectional,
2060                               const SpdyHeaderBlock& headers) {
2061   CHECK(in_io_loop_);
2062
2063   if (availability_state_ == STATE_CLOSED)
2064     return;
2065
2066   base::Time response_time = base::Time::Now();
2067   base::TimeTicks recv_first_byte_time = time_func_();
2068
2069   if (net_log_.IsLogging()) {
2070     net_log_.AddEvent(
2071         NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
2072         base::Bind(&NetLogSpdySynStreamReceivedCallback,
2073                    &headers, fin, unidirectional, priority,
2074                    stream_id, associated_stream_id));
2075   }
2076
2077   // Server-initiated streams should have even sequence numbers.
2078   if ((stream_id & 0x1) != 0) {
2079     LOG(WARNING) << "Received invalid OnSyn stream id " << stream_id;
2080     return;
2081   }
2082
2083   if (IsStreamActive(stream_id)) {
2084     LOG(WARNING) << "Received OnSyn for active stream " << stream_id;
2085     return;
2086   }
2087
2088   RequestPriority request_priority =
2089       ConvertSpdyPriorityToRequestPriority(priority, GetProtocolVersion());
2090
2091   if (availability_state_ == STATE_GOING_AWAY) {
2092     // TODO(akalin): This behavior isn't in the SPDY spec, although it
2093     // probably should be.
2094     EnqueueResetStreamFrame(stream_id, request_priority,
2095                             RST_STREAM_REFUSED_STREAM,
2096                             "OnSyn received when going away");
2097     return;
2098   }
2099
2100   // TODO(jgraettinger): SpdyFramer simulates OnSynStream() from HEADERS
2101   // frames, which don't convey associated stream ID. Disable this check
2102   // for now, and re-enable when PUSH_PROMISE is implemented properly.
2103   if (associated_stream_id == 0 && GetProtocolVersion() < SPDY4) {
2104     std::string description = base::StringPrintf(
2105         "Received invalid OnSyn associated stream id %d for stream %d",
2106         associated_stream_id, stream_id);
2107     EnqueueResetStreamFrame(stream_id, request_priority,
2108                             RST_STREAM_REFUSED_STREAM, description);
2109     return;
2110   }
2111
2112   streams_pushed_count_++;
2113
2114   // TODO(mbelshe): DCHECK that this is a GET method?
2115
2116   // Verify that the response had a URL for us.
2117   GURL gurl = GetUrlFromHeaderBlock(headers, GetProtocolVersion(), true);
2118   if (!gurl.is_valid()) {
2119     EnqueueResetStreamFrame(
2120         stream_id, request_priority, RST_STREAM_PROTOCOL_ERROR,
2121         "Pushed stream url was invalid: " + gurl.spec());
2122     return;
2123   }
2124
2125   // Verify we have a valid stream association.
2126   ActiveStreamMap::iterator associated_it =
2127       active_streams_.find(associated_stream_id);
2128   // TODO(jgraettinger): (See PUSH_PROMISE comment above).
2129   if (GetProtocolVersion() < SPDY4 && associated_it == active_streams_.end()) {
2130     EnqueueResetStreamFrame(
2131         stream_id, request_priority, RST_STREAM_INVALID_STREAM,
2132         base::StringPrintf(
2133             "Received OnSyn with inactive associated stream %d",
2134             associated_stream_id));
2135     return;
2136   }
2137
2138   // Check that the SYN advertises the same origin as its associated stream.
2139   // Bypass this check if and only if this session is with a SPDY proxy that
2140   // is trusted explicitly via the --trusted-spdy-proxy switch.
2141   if (trusted_spdy_proxy_.Equals(host_port_pair())) {
2142     // Disallow pushing of HTTPS content.
2143     if (gurl.SchemeIs("https")) {
2144       EnqueueResetStreamFrame(
2145           stream_id, request_priority, RST_STREAM_REFUSED_STREAM,
2146           base::StringPrintf(
2147               "Rejected push of Cross Origin HTTPS content %d",
2148               associated_stream_id));
2149     }
2150   } else if (GetProtocolVersion() < SPDY4) {
2151     // TODO(jgraettinger): (See PUSH_PROMISE comment above).
2152     GURL associated_url(associated_it->second.stream->GetUrlFromHeaders());
2153     if (associated_url.GetOrigin() != gurl.GetOrigin()) {
2154       EnqueueResetStreamFrame(
2155           stream_id, request_priority, RST_STREAM_REFUSED_STREAM,
2156           base::StringPrintf(
2157               "Rejected Cross Origin Push Stream %d",
2158               associated_stream_id));
2159       return;
2160     }
2161   }
2162
2163   // There should not be an existing pushed stream with the same path.
2164   PushedStreamMap::iterator pushed_it =
2165       unclaimed_pushed_streams_.lower_bound(gurl);
2166   if (pushed_it != unclaimed_pushed_streams_.end() &&
2167       pushed_it->first == gurl) {
2168     EnqueueResetStreamFrame(
2169         stream_id, request_priority, RST_STREAM_PROTOCOL_ERROR,
2170         "Received duplicate pushed stream with url: " +
2171         gurl.spec());
2172     return;
2173   }
2174
2175   scoped_ptr<SpdyStream> stream(
2176       new SpdyStream(SPDY_PUSH_STREAM, GetWeakPtr(), gurl,
2177                      request_priority,
2178                      stream_initial_send_window_size_,
2179                      stream_initial_recv_window_size_,
2180                      net_log_));
2181   stream->set_stream_id(stream_id);
2182   stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2183   last_compressed_frame_len_ = 0;
2184
2185   DeleteExpiredPushedStreams();
2186   PushedStreamMap::iterator inserted_pushed_it =
2187       unclaimed_pushed_streams_.insert(
2188           pushed_it,
2189           std::make_pair(gurl, PushedStreamInfo(stream_id, time_func_())));
2190   DCHECK(inserted_pushed_it != pushed_it);
2191
2192   InsertActivatedStream(stream.Pass());
2193
2194   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
2195   if (active_it == active_streams_.end()) {
2196     NOTREACHED();
2197     return;
2198   }
2199
2200   // Parse the headers.
2201   if (OnInitialResponseHeadersReceived(
2202           headers, response_time,
2203           recv_first_byte_time, active_it->second.stream) != OK)
2204     return;
2205
2206   base::StatsCounter push_requests("spdy.pushed_streams");
2207   push_requests.Increment();
2208 }
2209
2210 void SpdySession::DeleteExpiredPushedStreams() {
2211   if (unclaimed_pushed_streams_.empty())
2212     return;
2213
2214   // Check that adequate time has elapsed since the last sweep.
2215   if (time_func_() < next_unclaimed_push_stream_sweep_time_)
2216     return;
2217
2218   // Gather old streams to delete.
2219   base::TimeTicks minimum_freshness = time_func_() -
2220       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2221   std::vector<SpdyStreamId> streams_to_close;
2222   for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin();
2223        it != unclaimed_pushed_streams_.end(); ++it) {
2224     if (minimum_freshness > it->second.creation_time)
2225       streams_to_close.push_back(it->second.stream_id);
2226   }
2227
2228   for (std::vector<SpdyStreamId>::const_iterator to_close_it =
2229            streams_to_close.begin();
2230        to_close_it != streams_to_close.end(); ++to_close_it) {
2231     ActiveStreamMap::iterator active_it = active_streams_.find(*to_close_it);
2232     if (active_it == active_streams_.end())
2233       continue;
2234
2235     LogAbandonedActiveStream(active_it, ERR_INVALID_SPDY_STREAM);
2236     // CloseActiveStreamIterator() will remove the stream from
2237     // |unclaimed_pushed_streams_|.
2238     ResetStreamIterator(
2239         active_it, RST_STREAM_REFUSED_STREAM, "Stream not claimed.");
2240   }
2241
2242   next_unclaimed_push_stream_sweep_time_ = time_func_() +
2243       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2244 }
2245
2246 void SpdySession::OnSynReply(SpdyStreamId stream_id,
2247                              bool fin,
2248                              const SpdyHeaderBlock& headers) {
2249   CHECK(in_io_loop_);
2250
2251   if (availability_state_ == STATE_CLOSED)
2252     return;
2253
2254   base::Time response_time = base::Time::Now();
2255   base::TimeTicks recv_first_byte_time = time_func_();
2256
2257   if (net_log().IsLogging()) {
2258     net_log().AddEvent(
2259         NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
2260         base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2261                    &headers, fin, stream_id));
2262   }
2263
2264   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2265   if (it == active_streams_.end()) {
2266     // NOTE:  it may just be that the stream was cancelled.
2267     return;
2268   }
2269
2270   SpdyStream* stream = it->second.stream;
2271   CHECK_EQ(stream->stream_id(), stream_id);
2272
2273   stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2274   last_compressed_frame_len_ = 0;
2275
2276   if (GetProtocolVersion() >= SPDY4) {
2277     const std::string& error =
2278         "SPDY4 wasn't expecting SYN_REPLY.";
2279     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2280     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2281     return;
2282   }
2283   if (!it->second.waiting_for_syn_reply) {
2284     const std::string& error =
2285         "Received duplicate SYN_REPLY for stream.";
2286     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2287     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2288     return;
2289   }
2290   it->second.waiting_for_syn_reply = false;
2291
2292   ignore_result(OnInitialResponseHeadersReceived(
2293       headers, response_time, recv_first_byte_time, stream));
2294 }
2295
2296 void SpdySession::OnHeaders(SpdyStreamId stream_id,
2297                             bool fin,
2298                             const SpdyHeaderBlock& headers) {
2299   CHECK(in_io_loop_);
2300
2301   if (availability_state_ == STATE_CLOSED)
2302     return;
2303
2304   if (net_log().IsLogging()) {
2305     net_log().AddEvent(
2306         NetLog::TYPE_SPDY_SESSION_RECV_HEADERS,
2307         base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2308                    &headers, fin, stream_id));
2309   }
2310
2311   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2312   if (it == active_streams_.end()) {
2313     // NOTE:  it may just be that the stream was cancelled.
2314     LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
2315     return;
2316   }
2317
2318   SpdyStream* stream = it->second.stream;
2319   CHECK_EQ(stream->stream_id(), stream_id);
2320
2321   stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2322   last_compressed_frame_len_ = 0;
2323
2324   if (it->second.waiting_for_syn_reply) {
2325     if (GetProtocolVersion() < SPDY4) {
2326       const std::string& error =
2327           "Was expecting SYN_REPLY, not HEADERS.";
2328       stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2329       ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2330       return;
2331     }
2332     base::Time response_time = base::Time::Now();
2333     base::TimeTicks recv_first_byte_time = time_func_();
2334
2335     it->second.waiting_for_syn_reply = false;
2336     ignore_result(OnInitialResponseHeadersReceived(
2337         headers, response_time, recv_first_byte_time, stream));
2338   } else {
2339     int rv = stream->OnAdditionalResponseHeadersReceived(headers);
2340     if (rv < 0) {
2341       DCHECK_NE(rv, ERR_IO_PENDING);
2342       DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2343     }
2344   }
2345 }
2346
2347 void SpdySession::OnRstStream(SpdyStreamId stream_id,
2348                               SpdyRstStreamStatus status) {
2349   CHECK(in_io_loop_);
2350
2351   if (availability_state_ == STATE_CLOSED)
2352     return;
2353
2354   std::string description;
2355   net_log().AddEvent(
2356       NetLog::TYPE_SPDY_SESSION_RST_STREAM,
2357       base::Bind(&NetLogSpdyRstCallback,
2358                  stream_id, status, &description));
2359
2360   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2361   if (it == active_streams_.end()) {
2362     // NOTE:  it may just be that the stream was cancelled.
2363     LOG(WARNING) << "Received RST for invalid stream" << stream_id;
2364     return;
2365   }
2366
2367   CHECK_EQ(it->second.stream->stream_id(), stream_id);
2368
2369   if (status == 0) {
2370     it->second.stream->OnDataReceived(scoped_ptr<SpdyBuffer>());
2371   } else if (status == RST_STREAM_REFUSED_STREAM) {
2372     CloseActiveStreamIterator(it, ERR_SPDY_SERVER_REFUSED_STREAM);
2373   } else {
2374     RecordProtocolErrorHistogram(
2375         PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM);
2376     it->second.stream->LogStreamError(
2377         ERR_SPDY_PROTOCOL_ERROR,
2378         base::StringPrintf("SPDY stream closed with status: %d", status));
2379     // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
2380     //                For now, it doesn't matter much - it is a protocol error.
2381     CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
2382   }
2383 }
2384
2385 void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id,
2386                            SpdyGoAwayStatus status) {
2387   CHECK(in_io_loop_);
2388
2389   if (availability_state_ == STATE_CLOSED)
2390     return;
2391
2392   net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY,
2393       base::Bind(&NetLogSpdyGoAwayCallback,
2394                  last_accepted_stream_id,
2395                  active_streams_.size(),
2396                  unclaimed_pushed_streams_.size(),
2397                  status));
2398   MakeUnavailable();
2399   StartGoingAway(last_accepted_stream_id, ERR_ABORTED);
2400   // This is to handle the case when we already don't have any active
2401   // streams (i.e., StartGoingAway() did nothing). Otherwise, we have
2402   // active streams and so the last one being closed will finish the
2403   // going away process (see DeleteStream()).
2404   MaybeFinishGoingAway();
2405 }
2406
2407 void SpdySession::OnPing(SpdyPingId unique_id, bool is_ack) {
2408   CHECK(in_io_loop_);
2409
2410   if (availability_state_ == STATE_CLOSED)
2411     return;
2412
2413   net_log_.AddEvent(
2414       NetLog::TYPE_SPDY_SESSION_PING,
2415       base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "received"));
2416
2417   // Send response to a PING from server.
2418   if ((protocol_ >= kProtoSPDY4 && !is_ack) ||
2419       (protocol_ < kProtoSPDY4 && unique_id % 2 == 0)) {
2420     WritePingFrame(unique_id, true);
2421     return;
2422   }
2423
2424   --pings_in_flight_;
2425   if (pings_in_flight_ < 0) {
2426     RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING);
2427     CloseSessionResult result =
2428         DoCloseSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0.");
2429     DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
2430     pings_in_flight_ = 0;
2431     return;
2432   }
2433
2434   if (pings_in_flight_ > 0)
2435     return;
2436
2437   // We will record RTT in histogram when there are no more client sent
2438   // pings_in_flight_.
2439   RecordPingRTTHistogram(time_func_() - last_ping_sent_time_);
2440 }
2441
2442 void SpdySession::OnWindowUpdate(SpdyStreamId stream_id,
2443                                  uint32 delta_window_size) {
2444   CHECK(in_io_loop_);
2445
2446   if (availability_state_ == STATE_CLOSED)
2447     return;
2448
2449   DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max));
2450   net_log_.AddEvent(
2451       NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME,
2452       base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2453                  stream_id, delta_window_size));
2454
2455   if (stream_id == kSessionFlowControlStreamId) {
2456     // WINDOW_UPDATE for the session.
2457     if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) {
2458       LOG(WARNING) << "Received WINDOW_UPDATE for session when "
2459                    << "session flow control is not turned on";
2460       // TODO(akalin): Record an error and close the session.
2461       return;
2462     }
2463
2464     if (delta_window_size < 1u) {
2465       RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
2466       CloseSessionResult result = DoCloseSession(
2467           ERR_SPDY_PROTOCOL_ERROR,
2468           "Received WINDOW_UPDATE with an invalid delta_window_size " +
2469           base::UintToString(delta_window_size));
2470       DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
2471       return;
2472     }
2473
2474     IncreaseSendWindowSize(static_cast<int32>(delta_window_size));
2475   } else {
2476     // WINDOW_UPDATE for a stream.
2477     if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2478       // TODO(akalin): Record an error and close the session.
2479       LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id
2480                    << " when flow control is not turned on";
2481       return;
2482     }
2483
2484     ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2485
2486     if (it == active_streams_.end()) {
2487       // NOTE:  it may just be that the stream was cancelled.
2488       LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
2489       return;
2490     }
2491
2492     SpdyStream* stream = it->second.stream;
2493     CHECK_EQ(stream->stream_id(), stream_id);
2494
2495     if (delta_window_size < 1u) {
2496       ResetStreamIterator(it,
2497                           RST_STREAM_FLOW_CONTROL_ERROR,
2498                           base::StringPrintf(
2499                               "Received WINDOW_UPDATE with an invalid "
2500                               "delta_window_size %ud", delta_window_size));
2501       return;
2502     }
2503
2504     CHECK_EQ(it->second.stream->stream_id(), stream_id);
2505     it->second.stream->IncreaseSendWindowSize(
2506         static_cast<int32>(delta_window_size));
2507   }
2508 }
2509
2510 void SpdySession::OnPushPromise(SpdyStreamId stream_id,
2511                                 SpdyStreamId promised_stream_id) {
2512   // TODO(akalin): Handle PUSH_PROMISE frames.
2513 }
2514
2515 void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id,
2516                                          uint32 delta_window_size) {
2517   CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2518   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2519   CHECK(it != active_streams_.end());
2520   CHECK_EQ(it->second.stream->stream_id(), stream_id);
2521   SendWindowUpdateFrame(
2522       stream_id, delta_window_size, it->second.stream->priority());
2523 }
2524
2525 void SpdySession::SendInitialData() {
2526   DCHECK(enable_sending_initial_data_);
2527   DCHECK_NE(availability_state_, STATE_CLOSED);
2528
2529   if (send_connection_header_prefix_) {
2530     DCHECK_EQ(protocol_, kProtoSPDY4);
2531     scoped_ptr<SpdyFrame> connection_header_prefix_frame(
2532         new SpdyFrame(const_cast<char*>(kHttp2ConnectionHeaderPrefix),
2533                       kHttp2ConnectionHeaderPrefixSize,
2534                       false /* take_ownership */));
2535     // Count the prefix as part of the subsequent SETTINGS frame.
2536     EnqueueSessionWrite(HIGHEST, SETTINGS,
2537                         connection_header_prefix_frame.Pass());
2538   }
2539
2540   // First, notify the server about the settings they should use when
2541   // communicating with us.
2542   SettingsMap settings_map;
2543   // Create a new settings frame notifying the server of our
2544   // max concurrent streams and initial window size.
2545   settings_map[SETTINGS_MAX_CONCURRENT_STREAMS] =
2546       SettingsFlagsAndValue(SETTINGS_FLAG_NONE, kMaxConcurrentPushedStreams);
2547   if (flow_control_state_ >= FLOW_CONTROL_STREAM &&
2548       stream_initial_recv_window_size_ != kSpdyStreamInitialWindowSize) {
2549     settings_map[SETTINGS_INITIAL_WINDOW_SIZE] =
2550         SettingsFlagsAndValue(SETTINGS_FLAG_NONE,
2551                               stream_initial_recv_window_size_);
2552   }
2553   SendSettings(settings_map);
2554
2555   // Next, notify the server about our initial recv window size.
2556   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
2557     // Bump up the receive window size to the real initial value. This
2558     // has to go here since the WINDOW_UPDATE frame sent by
2559     // IncreaseRecvWindowSize() call uses |buffered_spdy_framer_|.
2560     DCHECK_GT(kDefaultInitialRecvWindowSize, session_recv_window_size_);
2561     // This condition implies that |kDefaultInitialRecvWindowSize| -
2562     // |session_recv_window_size_| doesn't overflow.
2563     DCHECK_GT(session_recv_window_size_, 0);
2564     IncreaseRecvWindowSize(
2565         kDefaultInitialRecvWindowSize - session_recv_window_size_);
2566   }
2567
2568   // Finally, notify the server about the settings they have
2569   // previously told us to use when communicating with them (after
2570   // applying them).
2571   const SettingsMap& server_settings_map =
2572       http_server_properties_->GetSpdySettings(host_port_pair());
2573   if (server_settings_map.empty())
2574     return;
2575
2576   SettingsMap::const_iterator it =
2577       server_settings_map.find(SETTINGS_CURRENT_CWND);
2578   uint32 cwnd = (it != server_settings_map.end()) ? it->second.second : 0;
2579   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", cwnd, 1, 200, 100);
2580
2581   for (SettingsMap::const_iterator it = server_settings_map.begin();
2582        it != server_settings_map.end(); ++it) {
2583     const SpdySettingsIds new_id = it->first;
2584     const uint32 new_val = it->second.second;
2585     HandleSetting(new_id, new_val);
2586   }
2587
2588   SendSettings(server_settings_map);
2589 }
2590
2591
2592 void SpdySession::SendSettings(const SettingsMap& settings) {
2593   DCHECK_NE(availability_state_, STATE_CLOSED);
2594
2595   net_log_.AddEvent(
2596       NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
2597       base::Bind(&NetLogSpdySendSettingsCallback, &settings));
2598
2599   // Create the SETTINGS frame and send it.
2600   DCHECK(buffered_spdy_framer_.get());
2601   scoped_ptr<SpdyFrame> settings_frame(
2602       buffered_spdy_framer_->CreateSettings(settings));
2603   sent_settings_ = true;
2604   EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass());
2605 }
2606
2607 void SpdySession::HandleSetting(uint32 id, uint32 value) {
2608   switch (id) {
2609     case SETTINGS_MAX_CONCURRENT_STREAMS:
2610       max_concurrent_streams_ = std::min(static_cast<size_t>(value),
2611                                          kMaxConcurrentStreamLimit);
2612       ProcessPendingStreamRequests();
2613       break;
2614     case SETTINGS_INITIAL_WINDOW_SIZE: {
2615       if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2616         net_log().AddEvent(
2617             NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_NO_FLOW_CONTROL);
2618         return;
2619       }
2620
2621       if (value > static_cast<uint32>(kint32max)) {
2622         net_log().AddEvent(
2623             NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_OUT_OF_RANGE,
2624             NetLog::IntegerCallback("initial_window_size", value));
2625         return;
2626       }
2627
2628       // SETTINGS_INITIAL_WINDOW_SIZE updates initial_send_window_size_ only.
2629       int32 delta_window_size =
2630           static_cast<int32>(value) - stream_initial_send_window_size_;
2631       stream_initial_send_window_size_ = static_cast<int32>(value);
2632       UpdateStreamsSendWindowSize(delta_window_size);
2633       net_log().AddEvent(
2634           NetLog::TYPE_SPDY_SESSION_UPDATE_STREAMS_SEND_WINDOW_SIZE,
2635           NetLog::IntegerCallback("delta_window_size", delta_window_size));
2636       break;
2637     }
2638   }
2639 }
2640
2641 void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
2642   DCHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2643   for (ActiveStreamMap::iterator it = active_streams_.begin();
2644        it != active_streams_.end(); ++it) {
2645     it->second.stream->AdjustSendWindowSize(delta_window_size);
2646   }
2647
2648   for (CreatedStreamSet::const_iterator it = created_streams_.begin();
2649        it != created_streams_.end(); it++) {
2650     (*it)->AdjustSendWindowSize(delta_window_size);
2651   }
2652 }
2653
2654 void SpdySession::SendPrefacePingIfNoneInFlight() {
2655   if (pings_in_flight_ || !enable_ping_based_connection_checking_)
2656     return;
2657
2658   base::TimeTicks now = time_func_();
2659   // If there is no activity in the session, then send a preface-PING.
2660   if ((now - last_activity_time_) > connection_at_risk_of_loss_time_)
2661     SendPrefacePing();
2662 }
2663
2664 void SpdySession::SendPrefacePing() {
2665   WritePingFrame(next_ping_id_, false);
2666 }
2667
2668 void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id,
2669                                         uint32 delta_window_size,
2670                                         RequestPriority priority) {
2671   CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2672   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2673   if (it != active_streams_.end()) {
2674     CHECK_EQ(it->second.stream->stream_id(), stream_id);
2675   } else {
2676     CHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2677     CHECK_EQ(stream_id, kSessionFlowControlStreamId);
2678   }
2679
2680   net_log_.AddEvent(
2681       NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME,
2682       base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2683                  stream_id, delta_window_size));
2684
2685   DCHECK(buffered_spdy_framer_.get());
2686   scoped_ptr<SpdyFrame> window_update_frame(
2687       buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
2688   EnqueueSessionWrite(priority, WINDOW_UPDATE, window_update_frame.Pass());
2689 }
2690
2691 void SpdySession::WritePingFrame(uint32 unique_id, bool is_ack) {
2692   DCHECK(buffered_spdy_framer_.get());
2693   scoped_ptr<SpdyFrame> ping_frame(
2694       buffered_spdy_framer_->CreatePingFrame(unique_id, is_ack));
2695   EnqueueSessionWrite(HIGHEST, PING, ping_frame.Pass());
2696
2697   if (net_log().IsLogging()) {
2698     net_log().AddEvent(
2699         NetLog::TYPE_SPDY_SESSION_PING,
2700         base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "sent"));
2701   }
2702   if (!is_ack) {
2703     next_ping_id_ += 2;
2704     ++pings_in_flight_;
2705     PlanToCheckPingStatus();
2706     last_ping_sent_time_ = time_func_();
2707   }
2708 }
2709
2710 void SpdySession::PlanToCheckPingStatus() {
2711   if (check_ping_status_pending_)
2712     return;
2713
2714   check_ping_status_pending_ = true;
2715   base::MessageLoop::current()->PostDelayedTask(
2716       FROM_HERE,
2717       base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2718                  time_func_()), hung_interval_);
2719 }
2720
2721 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) {
2722   CHECK(!in_io_loop_);
2723   DCHECK_NE(availability_state_, STATE_CLOSED);
2724
2725   // Check if we got a response back for all PINGs we had sent.
2726   if (pings_in_flight_ == 0) {
2727     check_ping_status_pending_ = false;
2728     return;
2729   }
2730
2731   DCHECK(check_ping_status_pending_);
2732
2733   base::TimeTicks now = time_func_();
2734   base::TimeDelta delay = hung_interval_ - (now - last_activity_time_);
2735
2736   if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) {
2737     // Track all failed PING messages in a separate bucket.
2738     RecordPingRTTHistogram(base::TimeDelta::Max());
2739     CloseSessionResult result =
2740         DoCloseSession(ERR_SPDY_PING_FAILED, "Failed ping.");
2741     DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED);
2742     return;
2743   }
2744
2745   // Check the status of connection after a delay.
2746   base::MessageLoop::current()->PostDelayedTask(
2747       FROM_HERE,
2748       base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2749                  now),
2750       delay);
2751 }
2752
2753 void SpdySession::RecordPingRTTHistogram(base::TimeDelta duration) {
2754   UMA_HISTOGRAM_TIMES("Net.SpdyPing.RTT", duration);
2755 }
2756
2757 void SpdySession::RecordProtocolErrorHistogram(
2758     SpdyProtocolErrorDetails details) {
2759   UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails2", details,
2760                             NUM_SPDY_PROTOCOL_ERROR_DETAILS);
2761   if (EndsWith(host_port_pair().host(), "google.com", false)) {
2762     UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails_Google2", details,
2763                               NUM_SPDY_PROTOCOL_ERROR_DETAILS);
2764   }
2765 }
2766
2767 void SpdySession::RecordHistograms() {
2768   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
2769                               streams_initiated_count_,
2770                               0, 300, 50);
2771   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
2772                               streams_pushed_count_,
2773                               0, 300, 50);
2774   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
2775                               streams_pushed_and_claimed_count_,
2776                               0, 300, 50);
2777   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
2778                               streams_abandoned_count_,
2779                               0, 300, 50);
2780   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
2781                             sent_settings_ ? 1 : 0, 2);
2782   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
2783                             received_settings_ ? 1 : 0, 2);
2784   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
2785                               stalled_streams_,
2786                               0, 300, 50);
2787   UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
2788                             stalled_streams_ > 0 ? 1 : 0, 2);
2789
2790   if (received_settings_) {
2791     // Enumerate the saved settings, and set histograms for it.
2792     const SettingsMap& settings_map =
2793         http_server_properties_->GetSpdySettings(host_port_pair());
2794
2795     SettingsMap::const_iterator it;
2796     for (it = settings_map.begin(); it != settings_map.end(); ++it) {
2797       const SpdySettingsIds id = it->first;
2798       const uint32 val = it->second.second;
2799       switch (id) {
2800         case SETTINGS_CURRENT_CWND:
2801           // Record several different histograms to see if cwnd converges
2802           // for larger volumes of data being sent.
2803           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
2804                                       val, 1, 200, 100);
2805           if (total_bytes_received_ > 10 * 1024) {
2806             UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
2807                                         val, 1, 200, 100);
2808             if (total_bytes_received_ > 25 * 1024) {
2809               UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
2810                                           val, 1, 200, 100);
2811               if (total_bytes_received_ > 50 * 1024) {
2812                 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
2813                                             val, 1, 200, 100);
2814                 if (total_bytes_received_ > 100 * 1024) {
2815                   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
2816                                               val, 1, 200, 100);
2817                 }
2818               }
2819             }
2820           }
2821           break;
2822         case SETTINGS_ROUND_TRIP_TIME:
2823           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
2824                                       val, 1, 1200, 100);
2825           break;
2826         case SETTINGS_DOWNLOAD_RETRANS_RATE:
2827           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
2828                                       val, 1, 100, 50);
2829           break;
2830         default:
2831           break;
2832       }
2833     }
2834   }
2835 }
2836
2837 void SpdySession::CompleteStreamRequest(
2838     const base::WeakPtr<SpdyStreamRequest>& pending_request) {
2839   // Abort if the request has already been cancelled.
2840   if (!pending_request)
2841     return;
2842
2843   base::WeakPtr<SpdyStream> stream;
2844   int rv = CreateStream(*pending_request, &stream);
2845
2846   if (rv == OK) {
2847     DCHECK(stream);
2848     pending_request->OnRequestCompleteSuccess(stream);
2849   } else {
2850     DCHECK(!stream);
2851     pending_request->OnRequestCompleteFailure(rv);
2852   }
2853 }
2854
2855 SSLClientSocket* SpdySession::GetSSLClientSocket() const {
2856   if (!is_secure_)
2857     return NULL;
2858   SSLClientSocket* ssl_socket =
2859       reinterpret_cast<SSLClientSocket*>(connection_->socket());
2860   DCHECK(ssl_socket);
2861   return ssl_socket;
2862 }
2863
2864 void SpdySession::OnWriteBufferConsumed(
2865     size_t frame_payload_size,
2866     size_t consume_size,
2867     SpdyBuffer::ConsumeSource consume_source) {
2868   // We can be called with |in_io_loop_| set if a write SpdyBuffer is
2869   // deleted (e.g., a stream is closed due to incoming data).
2870
2871   if (availability_state_ == STATE_CLOSED)
2872     return;
2873
2874   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2875
2876   if (consume_source == SpdyBuffer::DISCARD) {
2877     // If we're discarding a frame or part of it, increase the send
2878     // window by the number of discarded bytes. (Although if we're
2879     // discarding part of a frame, it's probably because of a write
2880     // error and we'll be tearing down the session soon.)
2881     size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
2882     DCHECK_GT(remaining_payload_bytes, 0u);
2883     IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
2884   }
2885   // For consumed bytes, the send window is increased when we receive
2886   // a WINDOW_UPDATE frame.
2887 }
2888
2889 void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
2890   // We can be called with |in_io_loop_| set if a SpdyBuffer is
2891   // deleted (e.g., a stream is closed due to incoming data).
2892
2893   DCHECK_NE(availability_state_, STATE_CLOSED);
2894   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2895   DCHECK_GE(delta_window_size, 1);
2896
2897   // Check for overflow.
2898   int32 max_delta_window_size = kint32max - session_send_window_size_;
2899   if (delta_window_size > max_delta_window_size) {
2900     RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
2901     CloseSessionResult result = DoCloseSession(
2902         ERR_SPDY_PROTOCOL_ERROR,
2903         "Received WINDOW_UPDATE [delta: " +
2904         base::IntToString(delta_window_size) +
2905         "] for session overflows session_send_window_size_ [current: " +
2906         base::IntToString(session_send_window_size_) + "]");
2907     DCHECK_NE(result, SESSION_ALREADY_CLOSED);
2908     return;
2909   }
2910
2911   session_send_window_size_ += delta_window_size;
2912
2913   net_log_.AddEvent(
2914       NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
2915       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
2916                  delta_window_size, session_send_window_size_));
2917
2918   DCHECK(!IsSendStalled());
2919   ResumeSendStalledStreams();
2920 }
2921
2922 void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) {
2923   DCHECK_NE(availability_state_, STATE_CLOSED);
2924   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2925
2926   // We only call this method when sending a frame. Therefore,
2927   // |delta_window_size| should be within the valid frame size range.
2928   DCHECK_GE(delta_window_size, 1);
2929   DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
2930
2931   // |send_window_size_| should have been at least |delta_window_size| for
2932   // this call to happen.
2933   DCHECK_GE(session_send_window_size_, delta_window_size);
2934
2935   session_send_window_size_ -= delta_window_size;
2936
2937   net_log_.AddEvent(
2938       NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
2939       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
2940                  -delta_window_size, session_send_window_size_));
2941 }
2942
2943 void SpdySession::OnReadBufferConsumed(
2944     size_t consume_size,
2945     SpdyBuffer::ConsumeSource consume_source) {
2946   // We can be called with |in_io_loop_| set if a read SpdyBuffer is
2947   // deleted (e.g., discarded by a SpdyReadQueue).
2948
2949   if (availability_state_ == STATE_CLOSED)
2950     return;
2951
2952   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2953   DCHECK_GE(consume_size, 1u);
2954   DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
2955
2956   IncreaseRecvWindowSize(static_cast<int32>(consume_size));
2957 }
2958
2959 void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) {
2960   DCHECK_NE(availability_state_, STATE_CLOSED);
2961   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2962   DCHECK_GE(session_unacked_recv_window_bytes_, 0);
2963   DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_);
2964   DCHECK_GE(delta_window_size, 1);
2965   // Check for overflow.
2966   DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_);
2967
2968   session_recv_window_size_ += delta_window_size;
2969   net_log_.AddEvent(
2970       NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
2971       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
2972                  delta_window_size, session_recv_window_size_));
2973
2974   session_unacked_recv_window_bytes_ += delta_window_size;
2975   if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) {
2976     SendWindowUpdateFrame(kSessionFlowControlStreamId,
2977                           session_unacked_recv_window_bytes_,
2978                           HIGHEST);
2979     session_unacked_recv_window_bytes_ = 0;
2980   }
2981 }
2982
2983 void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) {
2984   CHECK(in_io_loop_);
2985   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2986   DCHECK_GE(delta_window_size, 1);
2987
2988   // Since we never decrease the initial receive window size,
2989   // |delta_window_size| should never cause |recv_window_size_| to go
2990   // negative. If we do, the receive window isn't being respected.
2991   if (delta_window_size > session_recv_window_size_) {
2992     RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION);
2993     CloseSessionResult result = DoCloseSession(
2994         ERR_SPDY_PROTOCOL_ERROR,
2995         "delta_window_size is " + base::IntToString(delta_window_size) +
2996             " in DecreaseRecvWindowSize, which is larger than the receive " +
2997             "window size of " + base::IntToString(session_recv_window_size_));
2998     DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
2999     return;
3000   }
3001
3002   session_recv_window_size_ -= delta_window_size;
3003   net_log_.AddEvent(
3004       NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW,
3005       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3006                  -delta_window_size, session_recv_window_size_));
3007 }
3008
3009 void SpdySession::QueueSendStalledStream(const SpdyStream& stream) {
3010   DCHECK(stream.send_stalled_by_flow_control());
3011   RequestPriority priority = stream.priority();
3012   CHECK_GE(priority, MINIMUM_PRIORITY);
3013   CHECK_LE(priority, MAXIMUM_PRIORITY);
3014   stream_send_unstall_queue_[priority].push_back(stream.stream_id());
3015 }
3016
3017 void SpdySession::ResumeSendStalledStreams() {
3018   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3019
3020   // We don't have to worry about new streams being queued, since
3021   // doing so would cause IsSendStalled() to return true. But we do
3022   // have to worry about streams being closed, as well as ourselves
3023   // being closed.
3024
3025   while (availability_state_ != STATE_CLOSED && !IsSendStalled()) {
3026     size_t old_size = 0;
3027 #if DCHECK_IS_ON
3028     old_size = GetTotalSize(stream_send_unstall_queue_);
3029 #endif
3030
3031     SpdyStreamId stream_id = PopStreamToPossiblyResume();
3032     if (stream_id == 0)
3033       break;
3034     ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
3035     // The stream may actually still be send-stalled after this (due
3036     // to its own send window) but that's okay -- it'll then be
3037     // resumed once its send window increases.
3038     if (it != active_streams_.end())
3039       it->second.stream->PossiblyResumeIfSendStalled();
3040
3041     // The size should decrease unless we got send-stalled again.
3042     if (!IsSendStalled())
3043       DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size);
3044   }
3045 }
3046
3047 SpdyStreamId SpdySession::PopStreamToPossiblyResume() {
3048   for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) {
3049     std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i];
3050     if (!queue->empty()) {
3051       SpdyStreamId stream_id = queue->front();
3052       queue->pop_front();
3053       return stream_id;
3054     }
3055   }
3056   return 0;
3057 }
3058
3059 }  // namespace net