Upstream version 10.39.225.0
[platform/framework/web/crosswalk.git] / src / net / quic / quic_session.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/quic/quic_session.h"
6
7 #include "base/stl_util.h"
8 #include "net/quic/crypto/proof_verifier.h"
9 #include "net/quic/quic_connection.h"
10 #include "net/quic/quic_flags.h"
11 #include "net/quic/quic_flow_controller.h"
12 #include "net/quic/quic_headers_stream.h"
13 #include "net/ssl/ssl_info.h"
14
15 using base::StringPiece;
16 using base::hash_map;
17 using base::hash_set;
18 using std::make_pair;
19 using std::vector;
20
21 namespace net {
22
23 #define ENDPOINT (is_server() ? "Server: " : " Client: ")
24
25 // We want to make sure we delete any closed streams in a safe manner.
26 // To avoid deleting a stream in mid-operation, we have a simple shim between
27 // us and the stream, so we can delete any streams when we return from
28 // processing.
29 //
30 // We could just override the base methods, but this makes it easier to make
31 // sure we don't miss any.
32 class VisitorShim : public QuicConnectionVisitorInterface {
33  public:
34   explicit VisitorShim(QuicSession* session) : session_(session) {}
35
36   virtual void OnStreamFrames(const vector<QuicStreamFrame>& frames) OVERRIDE {
37     session_->OnStreamFrames(frames);
38     session_->PostProcessAfterData();
39   }
40   virtual void OnRstStream(const QuicRstStreamFrame& frame) OVERRIDE {
41     session_->OnRstStream(frame);
42     session_->PostProcessAfterData();
43   }
44
45   virtual void OnGoAway(const QuicGoAwayFrame& frame) OVERRIDE {
46     session_->OnGoAway(frame);
47     session_->PostProcessAfterData();
48   }
49
50   virtual void OnWindowUpdateFrames(const vector<QuicWindowUpdateFrame>& frames)
51       OVERRIDE {
52     session_->OnWindowUpdateFrames(frames);
53     session_->PostProcessAfterData();
54   }
55
56   virtual void OnBlockedFrames(const vector<QuicBlockedFrame>& frames)
57       OVERRIDE {
58     session_->OnBlockedFrames(frames);
59     session_->PostProcessAfterData();
60   }
61
62   virtual void OnCanWrite() OVERRIDE {
63     session_->OnCanWrite();
64     session_->PostProcessAfterData();
65   }
66
67   virtual void OnCongestionWindowChange(QuicTime now) OVERRIDE {
68     session_->OnCongestionWindowChange(now);
69   }
70
71   virtual void OnSuccessfulVersionNegotiation(
72       const QuicVersion& version) OVERRIDE {
73     session_->OnSuccessfulVersionNegotiation(version);
74   }
75
76   virtual void OnConnectionClosed(
77       QuicErrorCode error, bool from_peer) OVERRIDE {
78     session_->OnConnectionClosed(error, from_peer);
79     // The session will go away, so don't bother with cleanup.
80   }
81
82   virtual void OnWriteBlocked() OVERRIDE {
83     session_->OnWriteBlocked();
84   }
85
86   virtual bool WillingAndAbleToWrite() const OVERRIDE {
87     return session_->WillingAndAbleToWrite();
88   }
89
90   virtual bool HasPendingHandshake() const OVERRIDE {
91     return session_->HasPendingHandshake();
92   }
93
94   virtual bool HasOpenDataStreams() const OVERRIDE {
95     return session_->HasOpenDataStreams();
96   }
97
98  private:
99   QuicSession* session_;
100 };
101
102 QuicSession::QuicSession(QuicConnection* connection, const QuicConfig& config)
103     : connection_(connection),
104       visitor_shim_(new VisitorShim(this)),
105       config_(config),
106       max_open_streams_(config_.max_streams_per_connection()),
107       next_stream_id_(is_server() ? 2 : 5),
108       largest_peer_created_stream_id_(0),
109       error_(QUIC_NO_ERROR),
110       goaway_received_(false),
111       goaway_sent_(false),
112       has_pending_handshake_(false) {
113   if (connection_->version() <= QUIC_VERSION_19) {
114     flow_controller_.reset(new QuicFlowController(
115         connection_.get(), 0, is_server(), kDefaultFlowControlSendWindow,
116         config_.GetInitialFlowControlWindowToSend(),
117         config_.GetInitialFlowControlWindowToSend()));
118   } else {
119     flow_controller_.reset(new QuicFlowController(
120         connection_.get(), 0, is_server(), kDefaultFlowControlSendWindow,
121         config_.GetInitialSessionFlowControlWindowToSend(),
122         config_.GetInitialSessionFlowControlWindowToSend()));
123   }
124 }
125
126 void QuicSession::InitializeSession() {
127   connection_->set_visitor(visitor_shim_.get());
128   connection_->SetFromConfig(config_);
129   if (connection_->connected()) {
130     connection_->SetOverallConnectionTimeout(
131         config_.max_time_before_crypto_handshake());
132   }
133   headers_stream_.reset(new QuicHeadersStream(this));
134 }
135
136 QuicSession::~QuicSession() {
137   STLDeleteElements(&closed_streams_);
138   STLDeleteValues(&stream_map_);
139
140   DLOG_IF(WARNING,
141           locally_closed_streams_highest_offset_.size() > max_open_streams_)
142       << "Surprisingly high number of locally closed streams still waiting for "
143          "final byte offset: " << locally_closed_streams_highest_offset_.size();
144 }
145
146 void QuicSession::OnStreamFrames(const vector<QuicStreamFrame>& frames) {
147   for (size_t i = 0; i < frames.size(); ++i) {
148     // TODO(rch) deal with the error case of stream id 0.
149     const QuicStreamFrame& frame = frames[i];
150     QuicStreamId stream_id = frame.stream_id;
151     ReliableQuicStream* stream = GetStream(stream_id);
152     if (!stream) {
153       // The stream no longer exists, but we may still be interested in the
154       // final stream byte offset sent by the peer. A frame with a FIN can give
155       // us this offset.
156       if (frame.fin) {
157         QuicStreamOffset final_byte_offset =
158             frame.offset + frame.data.TotalBufferSize();
159         UpdateFlowControlOnFinalReceivedByteOffset(stream_id,
160                                                    final_byte_offset);
161       }
162
163       continue;
164     }
165     stream->OnStreamFrame(frames[i]);
166   }
167 }
168
169 void QuicSession::OnStreamHeaders(QuicStreamId stream_id,
170                                   StringPiece headers_data) {
171   QuicDataStream* stream = GetDataStream(stream_id);
172   if (!stream) {
173     // It's quite possible to receive headers after a stream has been reset.
174     return;
175   }
176   stream->OnStreamHeaders(headers_data);
177 }
178
179 void QuicSession::OnStreamHeadersPriority(QuicStreamId stream_id,
180                                           QuicPriority priority) {
181   QuicDataStream* stream = GetDataStream(stream_id);
182   if (!stream) {
183     // It's quite possible to receive headers after a stream has been reset.
184     return;
185   }
186   stream->OnStreamHeadersPriority(priority);
187 }
188
189 void QuicSession::OnStreamHeadersComplete(QuicStreamId stream_id,
190                                           bool fin,
191                                           size_t frame_len) {
192   QuicDataStream* stream = GetDataStream(stream_id);
193   if (!stream) {
194     // It's quite possible to receive headers after a stream has been reset.
195     return;
196   }
197   stream->OnStreamHeadersComplete(fin, frame_len);
198 }
199
200 void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
201   if (frame.stream_id == kCryptoStreamId) {
202     connection()->SendConnectionCloseWithDetails(
203         QUIC_INVALID_STREAM_ID,
204         "Attempt to reset the crypto stream");
205     return;
206   }
207   if (frame.stream_id == kHeadersStreamId) {
208     connection()->SendConnectionCloseWithDetails(
209         QUIC_INVALID_STREAM_ID,
210         "Attempt to reset the headers stream");
211     return;
212   }
213
214   QuicDataStream* stream = GetDataStream(frame.stream_id);
215   if (!stream) {
216     // The RST frame contains the final byte offset for the stream: we can now
217     // update the connection level flow controller if needed.
218     UpdateFlowControlOnFinalReceivedByteOffset(frame.stream_id,
219                                                frame.byte_offset);
220     return;  // Errors are handled by GetStream.
221   }
222
223   stream->OnStreamReset(frame);
224 }
225
226 void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) {
227   DCHECK(frame.last_good_stream_id < next_stream_id_);
228   goaway_received_ = true;
229 }
230
231 void QuicSession::OnConnectionClosed(QuicErrorCode error, bool from_peer) {
232   DCHECK(!connection_->connected());
233   if (error_ == QUIC_NO_ERROR) {
234     error_ = error;
235   }
236
237   while (!stream_map_.empty()) {
238     DataStreamMap::iterator it = stream_map_.begin();
239     QuicStreamId id = it->first;
240     it->second->OnConnectionClosed(error, from_peer);
241     // The stream should call CloseStream as part of OnConnectionClosed.
242     if (stream_map_.find(id) != stream_map_.end()) {
243       LOG(DFATAL) << ENDPOINT
244                   << "Stream failed to close under OnConnectionClosed";
245       CloseStream(id);
246     }
247   }
248 }
249
250 void QuicSession::OnWindowUpdateFrames(
251     const vector<QuicWindowUpdateFrame>& frames) {
252   bool connection_window_updated = false;
253   for (size_t i = 0; i < frames.size(); ++i) {
254     // Stream may be closed by the time we receive a WINDOW_UPDATE, so we can't
255     // assume that it still exists.
256     QuicStreamId stream_id = frames[i].stream_id;
257     if (stream_id == kConnectionLevelId) {
258       // This is a window update that applies to the connection, rather than an
259       // individual stream.
260       DVLOG(1) << ENDPOINT
261                << "Received connection level flow control window update with "
262                   "byte offset: " << frames[i].byte_offset;
263       if (flow_controller_->UpdateSendWindowOffset(frames[i].byte_offset)) {
264         connection_window_updated = true;
265       }
266       continue;
267     }
268
269     if (connection_->version() < QUIC_VERSION_21 &&
270         (stream_id == kCryptoStreamId || stream_id == kHeadersStreamId)) {
271       DLOG(DFATAL) << "WindowUpdate for stream " << stream_id << " in version "
272                    << QuicVersionToString(connection_->version());
273       return;
274     }
275
276     ReliableQuicStream* stream = GetStream(stream_id);
277     if (stream) {
278       stream->OnWindowUpdateFrame(frames[i]);
279     }
280   }
281
282   // Connection level flow control window has increased, so blocked streams can
283   // write again.
284   if (connection_window_updated) {
285     OnCanWrite();
286   }
287 }
288
289 void QuicSession::OnBlockedFrames(const vector<QuicBlockedFrame>& frames) {
290   for (size_t i = 0; i < frames.size(); ++i) {
291     // TODO(rjshade): Compare our flow control receive windows for specified
292     //                streams: if we have a large window then maybe something
293     //                had gone wrong with the flow control accounting.
294     DVLOG(1) << ENDPOINT << "Received BLOCKED frame with stream id: "
295              << frames[i].stream_id;
296   }
297 }
298
299 void QuicSession::OnCanWrite() {
300   // We limit the number of writes to the number of pending streams. If more
301   // streams become pending, WillingAndAbleToWrite will be true, which will
302   // cause the connection to request resumption before yielding to other
303   // connections.
304   size_t num_writes = write_blocked_streams_.NumBlockedStreams();
305   if (flow_controller_->IsBlocked()) {
306     // If we are connection level flow control blocked, then only allow the
307     // crypto and headers streams to try writing as all other streams will be
308     // blocked.
309     num_writes = 0;
310     if (write_blocked_streams_.crypto_stream_blocked()) {
311       num_writes += 1;
312     }
313     if (write_blocked_streams_.headers_stream_blocked()) {
314       num_writes += 1;
315     }
316   }
317   if (num_writes == 0) {
318     return;
319   }
320
321   QuicConnection::ScopedPacketBundler ack_bundler(
322       connection_.get(), QuicConnection::NO_ACK);
323   for (size_t i = 0; i < num_writes; ++i) {
324     if (!(write_blocked_streams_.HasWriteBlockedCryptoOrHeadersStream() ||
325           write_blocked_streams_.HasWriteBlockedDataStreams())) {
326       // Writing one stream removed another!? Something's broken.
327       LOG(DFATAL) << "WriteBlockedStream is missing";
328       connection_->CloseConnection(QUIC_INTERNAL_ERROR, false);
329       return;
330     }
331     if (!connection_->CanWriteStreamData()) {
332       return;
333     }
334     QuicStreamId stream_id = write_blocked_streams_.PopFront();
335     if (stream_id == kCryptoStreamId) {
336       has_pending_handshake_ = false;  // We just popped it.
337     }
338     ReliableQuicStream* stream = GetStream(stream_id);
339     if (stream != NULL && !stream->flow_controller()->IsBlocked()) {
340       // If the stream can't write all bytes, it'll re-add itself to the blocked
341       // list.
342       stream->OnCanWrite();
343     }
344   }
345 }
346
347 bool QuicSession::WillingAndAbleToWrite() const {
348   // If the crypto or headers streams are blocked, we want to schedule a write -
349   // they don't get blocked by connection level flow control. Otherwise only
350   // schedule a write if we are not flow control blocked at the connection
351   // level.
352   return write_blocked_streams_.HasWriteBlockedCryptoOrHeadersStream() ||
353          (!flow_controller_->IsBlocked() &&
354           write_blocked_streams_.HasWriteBlockedDataStreams());
355 }
356
357 bool QuicSession::HasPendingHandshake() const {
358   return has_pending_handshake_;
359 }
360
361 bool QuicSession::HasOpenDataStreams() const {
362   return GetNumOpenStreams() > 0;
363 }
364
365 QuicConsumedData QuicSession::WritevData(
366     QuicStreamId id,
367     const IOVector& data,
368     QuicStreamOffset offset,
369     bool fin,
370     FecProtection fec_protection,
371     QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
372   return connection_->SendStreamData(id, data, offset, fin, fec_protection,
373                                      ack_notifier_delegate);
374 }
375
376 size_t QuicSession::WriteHeaders(
377     QuicStreamId id,
378     const SpdyHeaderBlock& headers,
379     bool fin,
380     QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
381   return headers_stream_->WriteHeaders(id, headers, fin, ack_notifier_delegate);
382 }
383
384 void QuicSession::SendRstStream(QuicStreamId id,
385                                 QuicRstStreamErrorCode error,
386                                 QuicStreamOffset bytes_written) {
387   if (connection()->connected()) {
388     // Only send a RST_STREAM frame if still connected.
389     connection_->SendRstStream(id, error, bytes_written);
390   }
391   CloseStreamInner(id, true);
392 }
393
394 void QuicSession::SendGoAway(QuicErrorCode error_code, const string& reason) {
395   if (goaway_sent_) {
396     return;
397   }
398   goaway_sent_ = true;
399   connection_->SendGoAway(error_code, largest_peer_created_stream_id_, reason);
400 }
401
402 void QuicSession::CloseStream(QuicStreamId stream_id) {
403   CloseStreamInner(stream_id, false);
404 }
405
406 void QuicSession::CloseStreamInner(QuicStreamId stream_id,
407                                    bool locally_reset) {
408   DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
409
410   DataStreamMap::iterator it = stream_map_.find(stream_id);
411   if (it == stream_map_.end()) {
412     DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id;
413     return;
414   }
415   QuicDataStream* stream = it->second;
416
417   // Tell the stream that a RST has been sent.
418   if (locally_reset) {
419     stream->set_rst_sent(true);
420   }
421
422   closed_streams_.push_back(it->second);
423
424   // If we haven't received a FIN or RST for this stream, we need to keep track
425   // of the how many bytes the stream's flow controller believes it has
426   // received, for accurate connection level flow control accounting.
427   if (!stream->HasFinalReceivedByteOffset() &&
428       stream->flow_controller()->IsEnabled()) {
429     locally_closed_streams_highest_offset_[stream_id] =
430         stream->flow_controller()->highest_received_byte_offset();
431   }
432
433   stream_map_.erase(it);
434   stream->OnClose();
435 }
436
437 void QuicSession::UpdateFlowControlOnFinalReceivedByteOffset(
438     QuicStreamId stream_id, QuicStreamOffset final_byte_offset) {
439   map<QuicStreamId, QuicStreamOffset>::iterator it =
440       locally_closed_streams_highest_offset_.find(stream_id);
441   if (it == locally_closed_streams_highest_offset_.end()) {
442     return;
443   }
444
445   DVLOG(1) << ENDPOINT << "Received final byte offset " << final_byte_offset
446            << " for stream " << stream_id;
447   uint64 offset_diff = final_byte_offset - it->second;
448   if (flow_controller_->UpdateHighestReceivedOffset(
449       flow_controller_->highest_received_byte_offset() + offset_diff)) {
450     // If the final offset violates flow control, close the connection now.
451     if (flow_controller_->FlowControlViolation()) {
452       connection_->SendConnectionClose(
453           QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA);
454       return;
455     }
456   }
457
458   flow_controller_->AddBytesConsumed(offset_diff);
459   locally_closed_streams_highest_offset_.erase(it);
460 }
461
462 bool QuicSession::IsEncryptionEstablished() {
463   return GetCryptoStream()->encryption_established();
464 }
465
466 bool QuicSession::IsCryptoHandshakeConfirmed() {
467   return GetCryptoStream()->handshake_confirmed();
468 }
469
470 void QuicSession::OnConfigNegotiated() {
471   connection_->SetFromConfig(config_);
472   QuicVersion version = connection()->version();
473
474   // A server should accept a small number of additional streams beyond the
475   // limit sent to the client. This helps avoid early connection termination
476   // when FIN/RSTs for old streams are lost or arrive out of order.
477   if (FLAGS_quic_allow_more_open_streams) {
478     set_max_open_streams((is_server() ? kMaxStreamsMultiplier : 1.0) *
479                          config_.max_streams_per_connection());
480   }
481
482   if (version <= QUIC_VERSION_16) {
483     return;
484   }
485
486   if (version <= QUIC_VERSION_19) {
487     // QUIC_VERSION_17,18,19 don't support independent stream/session flow
488     // control windows.
489     if (config_.HasReceivedInitialFlowControlWindowBytes()) {
490       // Streams which were created before the SHLO was received (0-RTT
491       // requests) are now informed of the peer's initial flow control window.
492       uint32 new_window = config_.ReceivedInitialFlowControlWindowBytes();
493       OnNewStreamFlowControlWindow(new_window);
494       OnNewSessionFlowControlWindow(new_window);
495     }
496
497     return;
498   }
499
500   // QUIC_VERSION_21 and higher can have independent stream and session flow
501   // control windows.
502   if (config_.HasReceivedInitialStreamFlowControlWindowBytes()) {
503     // Streams which were created before the SHLO was received (0-RTT
504     // requests) are now informed of the peer's initial flow control window.
505     OnNewStreamFlowControlWindow(
506         config_.ReceivedInitialStreamFlowControlWindowBytes());
507   }
508   if (config_.HasReceivedInitialSessionFlowControlWindowBytes()) {
509     OnNewSessionFlowControlWindow(
510         config_.ReceivedInitialSessionFlowControlWindowBytes());
511   }
512 }
513
514 void QuicSession::OnNewStreamFlowControlWindow(uint32 new_window) {
515   if (new_window < kDefaultFlowControlSendWindow) {
516     LOG(ERROR)
517         << "Peer sent us an invalid stream flow control send window: "
518         << new_window << ", below default: " << kDefaultFlowControlSendWindow;
519     if (connection_->connected()) {
520       connection_->SendConnectionClose(QUIC_FLOW_CONTROL_INVALID_WINDOW);
521     }
522     return;
523   }
524
525   // Inform all existing streams about the new window.
526   if (connection_->version() >= QUIC_VERSION_21) {
527     GetCryptoStream()->flow_controller()->UpdateSendWindowOffset(new_window);
528     headers_stream_->flow_controller()->UpdateSendWindowOffset(new_window);
529   }
530   for (DataStreamMap::iterator it = stream_map_.begin();
531        it != stream_map_.end(); ++it) {
532     it->second->flow_controller()->UpdateSendWindowOffset(new_window);
533   }
534 }
535
536 void QuicSession::OnNewSessionFlowControlWindow(uint32 new_window) {
537   if (new_window < kDefaultFlowControlSendWindow) {
538     LOG(ERROR)
539         << "Peer sent us an invalid session flow control send window: "
540         << new_window << ", below default: " << kDefaultFlowControlSendWindow;
541     if (connection_->connected()) {
542       connection_->SendConnectionClose(QUIC_FLOW_CONTROL_INVALID_WINDOW);
543     }
544     return;
545   }
546
547   flow_controller_->UpdateSendWindowOffset(new_window);
548 }
549
550 void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) {
551   switch (event) {
552     // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter
553     // to QuicSession since it is the glue.
554     case ENCRYPTION_FIRST_ESTABLISHED:
555       break;
556
557     case ENCRYPTION_REESTABLISHED:
558       // Retransmit originally packets that were sent, since they can't be
559       // decrypted by the peer.
560       connection_->RetransmitUnackedPackets(ALL_INITIAL_RETRANSMISSION);
561       break;
562
563     case HANDSHAKE_CONFIRMED:
564       LOG_IF(DFATAL, !config_.negotiated()) << ENDPOINT
565           << "Handshake confirmed without parameter negotiation.";
566       // Discard originally encrypted packets, since they can't be decrypted by
567       // the peer.
568       connection_->NeuterUnencryptedPackets();
569       connection_->SetOverallConnectionTimeout(QuicTime::Delta::Infinite());
570       if (!FLAGS_quic_allow_more_open_streams) {
571         max_open_streams_ = config_.max_streams_per_connection();
572       }
573       break;
574
575     default:
576       LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event;
577   }
578 }
579
580 void QuicSession::OnCryptoHandshakeMessageSent(
581     const CryptoHandshakeMessage& message) {
582 }
583
584 void QuicSession::OnCryptoHandshakeMessageReceived(
585     const CryptoHandshakeMessage& message) {
586 }
587
588 QuicConfig* QuicSession::config() {
589   return &config_;
590 }
591
592 void QuicSession::ActivateStream(QuicDataStream* stream) {
593   DVLOG(1) << ENDPOINT << "num_streams: " << stream_map_.size()
594            << ". activating " << stream->id();
595   DCHECK_EQ(stream_map_.count(stream->id()), 0u);
596   stream_map_[stream->id()] = stream;
597 }
598
599 QuicStreamId QuicSession::GetNextStreamId() {
600   QuicStreamId id = next_stream_id_;
601   next_stream_id_ += 2;
602   return id;
603 }
604
605 ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) {
606   if (stream_id == kCryptoStreamId) {
607     return GetCryptoStream();
608   }
609   if (stream_id == kHeadersStreamId) {
610     return headers_stream_.get();
611   }
612   return GetDataStream(stream_id);
613 }
614
615 QuicDataStream* QuicSession::GetDataStream(const QuicStreamId stream_id) {
616   if (stream_id == kCryptoStreamId) {
617     DLOG(FATAL) << "Attempt to call GetDataStream with the crypto stream id";
618     return NULL;
619   }
620   if (stream_id == kHeadersStreamId) {
621     DLOG(FATAL) << "Attempt to call GetDataStream with the headers stream id";
622     return NULL;
623   }
624
625   DataStreamMap::iterator it = stream_map_.find(stream_id);
626   if (it != stream_map_.end()) {
627     return it->second;
628   }
629
630   if (IsClosedStream(stream_id)) {
631     return NULL;
632   }
633
634   if (stream_id % 2 == next_stream_id_ % 2) {
635     // We've received a frame for a locally-created stream that is not
636     // currently active.  This is an error.
637     if (connection()->connected()) {
638       connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM);
639     }
640     return NULL;
641   }
642
643   return GetIncomingDataStream(stream_id);
644 }
645
646 QuicDataStream* QuicSession::GetIncomingDataStream(QuicStreamId stream_id) {
647   if (IsClosedStream(stream_id)) {
648     return NULL;
649   }
650
651   implicitly_created_streams_.erase(stream_id);
652   if (stream_id > largest_peer_created_stream_id_) {
653     if (stream_id - largest_peer_created_stream_id_ > kMaxStreamIdDelta) {
654       // We may already have sent a connection close due to multiple reset
655       // streams in the same packet.
656       if (connection()->connected()) {
657         LOG(ERROR) << "Trying to get stream: " << stream_id
658                    << ", largest peer created stream: "
659                    << largest_peer_created_stream_id_
660                    << ", max delta: " << kMaxStreamIdDelta;
661         connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID);
662       }
663       return NULL;
664     }
665     if (largest_peer_created_stream_id_ == 0) {
666       if (is_server()) {
667         largest_peer_created_stream_id_= 3;
668       } else {
669         largest_peer_created_stream_id_= 1;
670       }
671     }
672     for (QuicStreamId id = largest_peer_created_stream_id_ + 2;
673          id < stream_id;
674          id += 2) {
675       implicitly_created_streams_.insert(id);
676     }
677     largest_peer_created_stream_id_ = stream_id;
678   }
679   QuicDataStream* stream = CreateIncomingDataStream(stream_id);
680   if (stream == NULL) {
681     return NULL;
682   }
683   ActivateStream(stream);
684   return stream;
685 }
686
687 void QuicSession::set_max_open_streams(size_t max_open_streams) {
688   DVLOG(1) << "Setting max_open_streams_ to " << max_open_streams;
689   max_open_streams_ = max_open_streams;
690 }
691
692 bool QuicSession::IsClosedStream(QuicStreamId id) {
693   DCHECK_NE(0u, id);
694   if (id == kCryptoStreamId) {
695     return false;
696   }
697   if (id == kHeadersStreamId) {
698     return false;
699   }
700   if (ContainsKey(stream_map_, id)) {
701     // Stream is active
702     return false;
703   }
704   if (id % 2 == next_stream_id_ % 2) {
705     // Locally created streams are strictly in-order.  If the id is in the
706     // range of created streams and it's not active, it must have been closed.
707     return id < next_stream_id_;
708   }
709   // For peer created streams, we also need to consider implicitly created
710   // streams.
711   return id <= largest_peer_created_stream_id_ &&
712       implicitly_created_streams_.count(id) == 0;
713 }
714
715 size_t QuicSession::GetNumOpenStreams() const {
716   return stream_map_.size() + implicitly_created_streams_.size();
717 }
718
719 void QuicSession::MarkWriteBlocked(QuicStreamId id, QuicPriority priority) {
720 #ifndef NDEBUG
721   ReliableQuicStream* stream = GetStream(id);
722   if (stream != NULL) {
723     LOG_IF(DFATAL, priority != stream->EffectivePriority())
724         << ENDPOINT << "Stream " << id
725         << "Priorities do not match.  Got: " << priority
726         << " Expected: " << stream->EffectivePriority();
727   } else {
728     LOG(DFATAL) << "Marking unknown stream " << id << " blocked.";
729   }
730 #endif
731
732   if (id == kCryptoStreamId) {
733     DCHECK(!has_pending_handshake_);
734     has_pending_handshake_ = true;
735     // TODO(jar): Be sure to use the highest priority for the crypto stream,
736     // perhaps by adding a "special" priority for it that is higher than
737     // kHighestPriority.
738     priority = kHighestPriority;
739   }
740   write_blocked_streams_.PushBack(id, priority);
741 }
742
743 bool QuicSession::HasDataToWrite() const {
744   return write_blocked_streams_.HasWriteBlockedCryptoOrHeadersStream() ||
745          write_blocked_streams_.HasWriteBlockedDataStreams() ||
746          connection_->HasQueuedData();
747 }
748
749 bool QuicSession::GetSSLInfo(SSLInfo* ssl_info) const {
750   NOTIMPLEMENTED();
751   return false;
752 }
753
754 void QuicSession::PostProcessAfterData() {
755   STLDeleteElements(&closed_streams_);
756   closed_streams_.clear();
757
758   if (FLAGS_close_quic_connection_unfinished_streams_2 &&
759       connection()->connected() &&
760       locally_closed_streams_highest_offset_.size() > max_open_streams_) {
761     // A buggy client may fail to send FIN/RSTs. Don't tolerate this.
762     connection_->SendConnectionClose(QUIC_TOO_MANY_UNFINISHED_STREAMS);
763   }
764 }
765
766 void QuicSession::OnSuccessfulVersionNegotiation(const QuicVersion& version) {
767   if (version < QUIC_VERSION_19) {
768     flow_controller_->Disable();
769   }
770
771   // Disable stream level flow control based on negotiated version. Streams may
772   // have been created with a different version.
773   if (version < QUIC_VERSION_21) {
774     GetCryptoStream()->flow_controller()->Disable();
775     headers_stream_->flow_controller()->Disable();
776   }
777   for (DataStreamMap::iterator it = stream_map_.begin();
778        it != stream_map_.end(); ++it) {
779     if (version <= QUIC_VERSION_16) {
780       it->second->flow_controller()->Disable();
781     }
782   }
783 }
784
785 }  // namespace net