Upstream version 5.34.104.0
[platform/framework/web/crosswalk.git] / src / net / quic / quic_session.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/quic/quic_session.h"
6
7 #include "base/stl_util.h"
8 #include "net/quic/crypto/proof_verifier.h"
9 #include "net/quic/quic_connection.h"
10 #include "net/quic/quic_headers_stream.h"
11 #include "net/ssl/ssl_info.h"
12
13 using base::StringPiece;
14 using base::hash_map;
15 using base::hash_set;
16 using std::make_pair;
17 using std::vector;
18
19 namespace net {
20
21 const size_t kMaxPrematurelyClosedStreamsTracked = 20;
22 const size_t kMaxZombieStreams = 20;
23
24 #define ENDPOINT (is_server() ? "Server: " : " Client: ")
25
26 // We want to make sure we delete any closed streams in a safe manner.
27 // To avoid deleting a stream in mid-operation, we have a simple shim between
28 // us and the stream, so we can delete any streams when we return from
29 // processing.
30 //
31 // We could just override the base methods, but this makes it easier to make
32 // sure we don't miss any.
33 class VisitorShim : public QuicConnectionVisitorInterface {
34  public:
35   explicit VisitorShim(QuicSession* session) : session_(session) {}
36
37   virtual bool OnStreamFrames(const vector<QuicStreamFrame>& frames) OVERRIDE {
38     bool accepted = session_->OnStreamFrames(frames);
39     session_->PostProcessAfterData();
40     return accepted;
41   }
42   virtual void OnRstStream(const QuicRstStreamFrame& frame) OVERRIDE {
43     session_->OnRstStream(frame);
44     session_->PostProcessAfterData();
45   }
46
47   virtual void OnGoAway(const QuicGoAwayFrame& frame) OVERRIDE {
48     session_->OnGoAway(frame);
49     session_->PostProcessAfterData();
50   }
51
52   virtual bool OnCanWrite() OVERRIDE {
53     bool rc = session_->OnCanWrite();
54     session_->PostProcessAfterData();
55     return rc;
56   }
57
58   virtual void OnSuccessfulVersionNegotiation(
59       const QuicVersion& version) OVERRIDE {
60     session_->OnSuccessfulVersionNegotiation(version);
61   }
62
63   virtual void OnConnectionClosed(
64       QuicErrorCode error, bool from_peer) OVERRIDE {
65     session_->OnConnectionClosed(error, from_peer);
66     // The session will go away, so don't bother with cleanup.
67   }
68
69   virtual void OnWriteBlocked() OVERRIDE {
70     session_->OnWriteBlocked();
71   }
72
73   virtual bool HasPendingHandshake() const OVERRIDE {
74     return session_->HasPendingHandshake();
75   }
76
77  private:
78   QuicSession* session_;
79 };
80
81 QuicSession::QuicSession(QuicConnection* connection,
82                          const QuicConfig& config)
83     : connection_(connection),
84       visitor_shim_(new VisitorShim(this)),
85       config_(config),
86       max_open_streams_(config_.max_streams_per_connection()),
87       next_stream_id_(is_server() ? 2 : 3),
88       largest_peer_created_stream_id_(0),
89       error_(QUIC_NO_ERROR),
90       goaway_received_(false),
91       goaway_sent_(false),
92       has_pending_handshake_(false) {
93
94   connection_->set_visitor(visitor_shim_.get());
95   connection_->SetFromConfig(config_);
96   if (connection_->connected()) {
97     connection_->SetOverallConnectionTimeout(
98         config_.max_time_before_crypto_handshake());
99   }
100   if (connection_->version() > QUIC_VERSION_12) {
101     headers_stream_.reset(new QuicHeadersStream(this));
102     if (!is_server()) {
103       // For version above QUIC v12, the headers stream is stream 3, so the
104       // next available local stream ID should be 5.
105       DCHECK_EQ(kHeadersStreamId, next_stream_id_);
106       next_stream_id_ += 2;
107     }
108   }
109 }
110
111 QuicSession::~QuicSession() {
112   STLDeleteElements(&closed_streams_);
113   STLDeleteValues(&stream_map_);
114 }
115
116 bool QuicSession::OnStreamFrames(const vector<QuicStreamFrame>& frames) {
117   for (size_t i = 0; i < frames.size(); ++i) {
118     // TODO(rch) deal with the error case of stream id 0
119     if (IsClosedStream(frames[i].stream_id)) {
120       // If we get additional frames for a stream where we didn't process
121       // headers, it's highly likely our compression context will end up
122       // permanently out of sync with the peer's, so we give up and close the
123       // connection.
124       if (ContainsKey(prematurely_closed_streams_, frames[i].stream_id)) {
125         connection()->SendConnectionClose(
126             QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
127         return false;
128       }
129       continue;
130     }
131
132     ReliableQuicStream* stream = GetStream(frames[i].stream_id);
133     if (stream == NULL) return false;
134     if (!stream->WillAcceptStreamFrame(frames[i])) return false;
135
136     // TODO(alyssar) check against existing connection address: if changed, make
137     // sure we update the connection.
138   }
139
140   for (size_t i = 0; i < frames.size(); ++i) {
141     QuicStreamId stream_id = frames[i].stream_id;
142     ReliableQuicStream* stream = GetStream(stream_id);
143     if (!stream) {
144       continue;
145     }
146     stream->OnStreamFrame(frames[i]);
147
148     // If the stream is a data stream had been prematurely closed, and the
149     // headers are now decompressed, then we are finally finished
150     // with this stream.
151     if (ContainsKey(zombie_streams_, stream_id) &&
152         static_cast<QuicDataStream*>(stream)->headers_decompressed()) {
153       CloseZombieStream(stream_id);
154     }
155   }
156
157   while (!decompression_blocked_streams_.empty()) {
158     QuicHeaderId header_id = decompression_blocked_streams_.begin()->first;
159     if (header_id != decompressor_.current_header_id()) {
160       break;
161     }
162     QuicStreamId stream_id = decompression_blocked_streams_.begin()->second;
163     decompression_blocked_streams_.erase(header_id);
164     QuicDataStream* stream = GetDataStream(stream_id);
165     if (!stream) {
166       connection()->SendConnectionClose(
167           QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
168       return false;
169     }
170     stream->OnDecompressorAvailable();
171   }
172   return true;
173 }
174
175 void QuicSession::OnStreamHeaders(QuicStreamId stream_id,
176                                   StringPiece headers_data) {
177   QuicDataStream* stream = GetDataStream(stream_id);
178   if (!stream) {
179     // It's quite possible to receive headers after a stream has been reset.
180     return;
181   }
182   stream->OnStreamHeaders(headers_data);
183 }
184
185 void QuicSession::OnStreamHeadersPriority(QuicStreamId stream_id,
186                                           QuicPriority priority) {
187   QuicDataStream* stream = GetDataStream(stream_id);
188   if (!stream) {
189     // It's quite possible to receive headers after a stream has been reset.
190     return;
191   }
192   stream->OnStreamHeadersPriority(priority);
193 }
194
195 void QuicSession::OnStreamHeadersComplete(QuicStreamId stream_id,
196                                           bool fin,
197                                           size_t frame_len) {
198   QuicDataStream* stream = GetDataStream(stream_id);
199   if (!stream) {
200     // It's quite possible to receive headers after a stream has been reset.
201     return;
202   }
203   stream->OnStreamHeadersComplete(fin, frame_len);
204 }
205
206 void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
207   if (frame.stream_id == kCryptoStreamId) {
208     connection()->SendConnectionCloseWithDetails(
209         QUIC_INVALID_STREAM_ID,
210         "Attempt to reset the crypto stream");
211     return;
212   }
213   if (frame.stream_id == kHeadersStreamId &&
214       connection()->version() > QUIC_VERSION_12) {
215     connection()->SendConnectionCloseWithDetails(
216         QUIC_INVALID_STREAM_ID,
217         "Attempt to reset the headers stream");
218     return;
219   }
220   QuicDataStream* stream = GetDataStream(frame.stream_id);
221   if (!stream) {
222     return;  // Errors are handled by GetStream.
223   }
224   if (ContainsKey(zombie_streams_, stream->id())) {
225     // If this was a zombie stream then we close it out now.
226     CloseZombieStream(stream->id());
227     // However, since the headers still have not been decompressed, we want to
228     // mark it a prematurely closed so that if we ever receive frames
229     // for this stream we can close the connection.
230     DCHECK(!stream->headers_decompressed());
231     AddPrematurelyClosedStream(frame.stream_id);
232     return;
233   }
234   if (connection()->version() <= QUIC_VERSION_12) {
235     if (stream->stream_bytes_read() > 0 && !stream->headers_decompressed()) {
236       connection()->SendConnectionClose(
237           QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
238     }
239   }
240   stream->OnStreamReset(frame);
241 }
242
243 void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) {
244   DCHECK(frame.last_good_stream_id < next_stream_id_);
245   goaway_received_ = true;
246 }
247
248 void QuicSession::OnConnectionClosed(QuicErrorCode error, bool from_peer) {
249   DCHECK(!connection_->connected());
250   if (error_ == QUIC_NO_ERROR) {
251     error_ = error;
252   }
253
254   while (!stream_map_.empty()) {
255     DataStreamMap::iterator it = stream_map_.begin();
256     QuicStreamId id = it->first;
257     it->second->OnConnectionClosed(error, from_peer);
258     // The stream should call CloseStream as part of OnConnectionClosed.
259     if (stream_map_.find(id) != stream_map_.end()) {
260       LOG(DFATAL) << ENDPOINT
261                   << "Stream failed to close under OnConnectionClosed";
262       CloseStream(id);
263     }
264   }
265 }
266
267 bool QuicSession::OnCanWrite() {
268   // We latch this here rather than doing a traditional loop, because streams
269   // may be modifying the list as we loop.
270   int remaining_writes = write_blocked_streams_.NumBlockedStreams();
271
272   while (remaining_writes > 0 && connection_->CanWriteStreamData()) {
273     DCHECK(write_blocked_streams_.HasWriteBlockedStreams());
274     if (!write_blocked_streams_.HasWriteBlockedStreams()) {
275       LOG(DFATAL) << "WriteBlockedStream is missing";
276       connection_->CloseConnection(QUIC_INTERNAL_ERROR, false);
277       return true;  // We have no write blocked streams.
278     }
279     QuicStreamId stream_id = write_blocked_streams_.PopFront();
280     if (stream_id == kCryptoStreamId) {
281       has_pending_handshake_ = false;  // We just popped it.
282     }
283     ReliableQuicStream* stream = GetStream(stream_id);
284     if (stream != NULL) {
285       // If the stream can't write all bytes, it'll re-add itself to the blocked
286       // list.
287       stream->OnCanWrite();
288     }
289     --remaining_writes;
290   }
291
292   return !write_blocked_streams_.HasWriteBlockedStreams();
293 }
294
295 bool QuicSession::HasPendingHandshake() const {
296   return has_pending_handshake_;
297 }
298
299 QuicConsumedData QuicSession::WritevData(
300     QuicStreamId id,
301     const struct iovec* iov,
302     int iov_count,
303     QuicStreamOffset offset,
304     bool fin,
305     QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
306   IOVector data;
307   data.AppendIovec(iov, iov_count);
308   return connection_->SendStreamData(id, data, offset, fin,
309                                      ack_notifier_delegate);
310 }
311
312 size_t QuicSession::WriteHeaders(QuicStreamId id,
313                                const SpdyHeaderBlock& headers,
314                                bool fin) {
315   DCHECK_LT(QUIC_VERSION_12, connection()->version());
316   if (connection()->version() <= QUIC_VERSION_12) {
317     return 0;
318   }
319   return headers_stream_->WriteHeaders(id, headers, fin);
320 }
321
322 void QuicSession::SendRstStream(QuicStreamId id,
323                                 QuicRstStreamErrorCode error,
324                                 QuicStreamOffset bytes_written) {
325   connection_->SendRstStream(id, error, bytes_written);
326   CloseStreamInner(id, true);
327 }
328
329 void QuicSession::SendGoAway(QuicErrorCode error_code, const string& reason) {
330   goaway_sent_ = true;
331   connection_->SendGoAway(error_code, largest_peer_created_stream_id_, reason);
332 }
333
334 void QuicSession::CloseStream(QuicStreamId stream_id) {
335   CloseStreamInner(stream_id, false);
336 }
337
338 void QuicSession::CloseStreamInner(QuicStreamId stream_id,
339                                    bool locally_reset) {
340   DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
341
342   DataStreamMap::iterator it = stream_map_.find(stream_id);
343   if (it == stream_map_.end()) {
344     DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id;
345     return;
346   }
347   QuicDataStream* stream = it->second;
348
349   // Tell the stream that a RST has been sent.
350   if (locally_reset) {
351     stream->set_rst_sent(true);
352   }
353
354   if (connection_->version() <= QUIC_VERSION_12 &&
355       connection_->connected() && !stream->headers_decompressed()) {
356     // If the stream is being closed locally (for example a client cancelling
357     // a request before receiving the response) then we need to make sure that
358     // we keep the stream alive long enough to process any response or
359     // RST_STREAM frames.
360     if (locally_reset && !is_server()) {
361       AddZombieStream(stream_id);
362       return;
363     }
364
365     // This stream has been closed before the headers were decompressed.
366     // This might cause problems with head of line blocking of headers.
367     // If the peer sent headers which were lost but we now close the stream
368     // we will never be able to decompress headers for other streams.
369     // To deal with this, we keep track of streams which have been closed
370     // prematurely.  If we ever receive data frames for this steam, then we
371     // know there actually has been a problem and we close the connection.
372     AddPrematurelyClosedStream(stream->id());
373   }
374   closed_streams_.push_back(it->second);
375   if (ContainsKey(zombie_streams_, stream->id())) {
376     zombie_streams_.erase(stream->id());
377   }
378   stream_map_.erase(it);
379   stream->OnClose();
380 }
381
382 void QuicSession::AddZombieStream(QuicStreamId stream_id) {
383   if (zombie_streams_.size() == kMaxZombieStreams) {
384     QuicStreamId oldest_zombie_stream_id = zombie_streams_.begin()->first;
385     CloseZombieStream(oldest_zombie_stream_id);
386     // However, since the headers still have not been decompressed, we want to
387     // mark it a prematurely closed so that if we ever receive frames
388     // for this stream we can close the connection.
389     AddPrematurelyClosedStream(oldest_zombie_stream_id);
390   }
391   zombie_streams_.insert(make_pair(stream_id, true));
392 }
393
394 void QuicSession::CloseZombieStream(QuicStreamId stream_id) {
395   DCHECK(ContainsKey(zombie_streams_, stream_id));
396   zombie_streams_.erase(stream_id);
397   QuicDataStream* stream = GetDataStream(stream_id);
398   if (!stream) {
399     return;
400   }
401   stream_map_.erase(stream_id);
402   stream->OnClose();
403   closed_streams_.push_back(stream);
404 }
405
406 void QuicSession::AddPrematurelyClosedStream(QuicStreamId stream_id) {
407   if (connection()->version() > QUIC_VERSION_12) {
408     return;
409   }
410   if (prematurely_closed_streams_.size() ==
411       kMaxPrematurelyClosedStreamsTracked) {
412     prematurely_closed_streams_.erase(prematurely_closed_streams_.begin());
413   }
414   prematurely_closed_streams_.insert(make_pair(stream_id, true));
415 }
416
417 bool QuicSession::IsEncryptionEstablished() {
418   return GetCryptoStream()->encryption_established();
419 }
420
421 bool QuicSession::IsCryptoHandshakeConfirmed() {
422   return GetCryptoStream()->handshake_confirmed();
423 }
424
425 void QuicSession::OnConfigNegotiated() {
426   connection_->SetFromConfig(config_);
427 }
428
429 void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) {
430   switch (event) {
431     // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter
432     // to QuicSession since it is the glue.
433     case ENCRYPTION_FIRST_ESTABLISHED:
434       break;
435
436     case ENCRYPTION_REESTABLISHED:
437       // Retransmit originally packets that were sent, since they can't be
438       // decrypted by the peer.
439       connection_->RetransmitUnackedPackets(INITIAL_ENCRYPTION_ONLY);
440       break;
441
442     case HANDSHAKE_CONFIRMED:
443       LOG_IF(DFATAL, !config_.negotiated()) << ENDPOINT
444           << "Handshake confirmed without parameter negotiation.";
445       connection_->SetOverallConnectionTimeout(QuicTime::Delta::Infinite());
446       max_open_streams_ = config_.max_streams_per_connection();
447       break;
448
449     default:
450       LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event;
451   }
452 }
453
454 void QuicSession::OnCryptoHandshakeMessageSent(
455     const CryptoHandshakeMessage& message) {
456 }
457
458 void QuicSession::OnCryptoHandshakeMessageReceived(
459     const CryptoHandshakeMessage& message) {
460 }
461
462 QuicConfig* QuicSession::config() {
463   return &config_;
464 }
465
466 void QuicSession::ActivateStream(QuicDataStream* stream) {
467   DVLOG(1) << ENDPOINT << "num_streams: " << stream_map_.size()
468              << ". activating " << stream->id();
469   DCHECK_EQ(stream_map_.count(stream->id()), 0u);
470   stream_map_[stream->id()] = stream;
471 }
472
473 QuicStreamId QuicSession::GetNextStreamId() {
474   QuicStreamId id = next_stream_id_;
475   next_stream_id_ += 2;
476   return id;
477 }
478
479 ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) {
480   if (stream_id == kCryptoStreamId) {
481     return GetCryptoStream();
482   }
483   if (stream_id == kHeadersStreamId &&
484       connection_->version() > QUIC_VERSION_12) {
485     return headers_stream_.get();
486   }
487   return GetDataStream(stream_id);
488 }
489
490 QuicDataStream* QuicSession::GetDataStream(const QuicStreamId stream_id) {
491   if (stream_id == kCryptoStreamId) {
492     DLOG(FATAL) << "Attempt to call GetDataStream with the crypto stream id";
493     return NULL;
494   }
495   if (stream_id == kHeadersStreamId &&
496       connection_->version() > QUIC_VERSION_12) {
497     DLOG(FATAL) << "Attempt to call GetDataStream with the headers stream id";
498     return NULL;
499   }
500
501   DataStreamMap::iterator it = stream_map_.find(stream_id);
502   if (it != stream_map_.end()) {
503     return it->second;
504   }
505
506   if (IsClosedStream(stream_id)) {
507     return NULL;
508   }
509
510   if (stream_id % 2 == next_stream_id_ % 2) {
511     // We've received a frame for a locally-created stream that is not
512     // currently active.  This is an error.
513     if (connection()->connected()) {
514       connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM);
515     }
516     return NULL;
517   }
518
519   return GetIncomingDataStream(stream_id);
520 }
521
522 QuicDataStream* QuicSession::GetIncomingDataStream(QuicStreamId stream_id) {
523   if (IsClosedStream(stream_id)) {
524     return NULL;
525   }
526
527   if (goaway_sent_) {
528     // We've already sent a GoAway
529     SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY, 0);
530     return NULL;
531   }
532
533   implicitly_created_streams_.erase(stream_id);
534   if (stream_id > largest_peer_created_stream_id_) {
535     // TODO(rch) add unit test for this
536     if (stream_id - largest_peer_created_stream_id_ > kMaxStreamIdDelta) {
537       connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID);
538       return NULL;
539     }
540     if (largest_peer_created_stream_id_ == 0) {
541       if (is_server() && connection()->version() > QUIC_VERSION_12) {
542         largest_peer_created_stream_id_= 3;
543       } else {
544         largest_peer_created_stream_id_= 1;
545       }
546     }
547     for (QuicStreamId id = largest_peer_created_stream_id_ + 2;
548          id < stream_id;
549          id += 2) {
550       implicitly_created_streams_.insert(id);
551     }
552     largest_peer_created_stream_id_ = stream_id;
553   }
554   QuicDataStream* stream = CreateIncomingDataStream(stream_id);
555   if (stream == NULL) {
556     return NULL;
557   }
558   ActivateStream(stream);
559   return stream;
560 }
561
562 bool QuicSession::IsClosedStream(QuicStreamId id) {
563   DCHECK_NE(0u, id);
564   if (id == kCryptoStreamId) {
565     return false;
566   }
567   if (connection()->version() > QUIC_VERSION_12) {
568     if (id == kHeadersStreamId) {
569       return false;
570     }
571   }
572   if (ContainsKey(zombie_streams_, id)) {
573     return true;
574   }
575   if (ContainsKey(stream_map_, id)) {
576     // Stream is active
577     return false;
578   }
579   if (id % 2 == next_stream_id_ % 2) {
580     // Locally created streams are strictly in-order.  If the id is in the
581     // range of created streams and it's not active, it must have been closed.
582     return id < next_stream_id_;
583   }
584   // For peer created streams, we also need to consider implicitly created
585   // streams.
586   return id <= largest_peer_created_stream_id_ &&
587       implicitly_created_streams_.count(id) == 0;
588 }
589
590 size_t QuicSession::GetNumOpenStreams() const {
591   return stream_map_.size() + implicitly_created_streams_.size() -
592       zombie_streams_.size();
593 }
594
595 void QuicSession::MarkWriteBlocked(QuicStreamId id, QuicPriority priority) {
596 #ifndef NDEBUG
597   ReliableQuicStream* stream = GetStream(id);
598   if (stream != NULL) {
599     LOG_IF(DFATAL, priority != stream->EffectivePriority())
600         << "Priorities do not match.  Got: " << priority
601         << " Expected: " << stream->EffectivePriority();
602   } else {
603     LOG(DFATAL) << "Marking unknown stream " << id << " blocked.";
604   }
605 #endif
606
607   if (id == kCryptoStreamId) {
608     DCHECK(!has_pending_handshake_);
609     has_pending_handshake_ = true;
610     // TODO(jar): Be sure to use the highest priority for the crypto stream,
611     // perhaps by adding a "special" priority for it that is higher than
612     // kHighestPriority.
613     priority = kHighestPriority;
614   }
615   write_blocked_streams_.PushBack(id, priority, connection()->version());
616 }
617
618 bool QuicSession::HasDataToWrite() const {
619   return write_blocked_streams_.HasWriteBlockedStreams() ||
620       connection_->HasQueuedData();
621 }
622
623 void QuicSession::MarkDecompressionBlocked(QuicHeaderId header_id,
624                                            QuicStreamId stream_id) {
625   DCHECK_GE(QUIC_VERSION_12, connection()->version());
626   decompression_blocked_streams_[header_id] = stream_id;
627 }
628
629 bool QuicSession::GetSSLInfo(SSLInfo* ssl_info) {
630   NOTIMPLEMENTED();
631   return false;
632 }
633
634 void QuicSession::PostProcessAfterData() {
635   STLDeleteElements(&closed_streams_);
636   closed_streams_.clear();
637 }
638
639 }  // namespace net