Upstream version 5.34.92.0
[platform/framework/web/crosswalk.git] / src / net / quic / quic_session.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/quic/quic_session.h"
6
7 #include "base/stl_util.h"
8 #include "net/quic/crypto/proof_verifier.h"
9 #include "net/quic/quic_connection.h"
10 #include "net/quic/quic_headers_stream.h"
11 #include "net/ssl/ssl_info.h"
12
13 using base::StringPiece;
14 using base::hash_map;
15 using base::hash_set;
16 using std::make_pair;
17 using std::vector;
18
19 namespace net {
20
21 const size_t kMaxPrematurelyClosedStreamsTracked = 20;
22 const size_t kMaxZombieStreams = 20;
23
24 #define ENDPOINT (is_server() ? "Server: " : " Client: ")
25
26 // We want to make sure we delete any closed streams in a safe manner.
27 // To avoid deleting a stream in mid-operation, we have a simple shim between
28 // us and the stream, so we can delete any streams when we return from
29 // processing.
30 //
31 // We could just override the base methods, but this makes it easier to make
32 // sure we don't miss any.
33 class VisitorShim : public QuicConnectionVisitorInterface {
34  public:
35   explicit VisitorShim(QuicSession* session) : session_(session) {}
36
37   virtual bool OnStreamFrames(const vector<QuicStreamFrame>& frames) OVERRIDE {
38     bool accepted = session_->OnStreamFrames(frames);
39     session_->PostProcessAfterData();
40     return accepted;
41   }
42   virtual void OnRstStream(const QuicRstStreamFrame& frame) OVERRIDE {
43     session_->OnRstStream(frame);
44     session_->PostProcessAfterData();
45   }
46
47   virtual void OnGoAway(const QuicGoAwayFrame& frame) OVERRIDE {
48     session_->OnGoAway(frame);
49     session_->PostProcessAfterData();
50   }
51
52   virtual bool OnCanWrite() OVERRIDE {
53     bool rc = session_->OnCanWrite();
54     session_->PostProcessAfterData();
55     return rc;
56   }
57
58   virtual void OnSuccessfulVersionNegotiation(
59       const QuicVersion& version) OVERRIDE {
60     session_->OnSuccessfulVersionNegotiation(version);
61   }
62
63   virtual void OnConfigNegotiated() OVERRIDE {
64     session_->OnConfigNegotiated();
65   }
66
67   virtual void OnConnectionClosed(
68       QuicErrorCode error, bool from_peer) OVERRIDE {
69     session_->OnConnectionClosed(error, from_peer);
70     // The session will go away, so don't bother with cleanup.
71   }
72
73   virtual void OnWriteBlocked() OVERRIDE {
74     session_->OnWriteBlocked();
75   }
76
77   virtual bool HasPendingHandshake() const OVERRIDE {
78     return session_->HasPendingHandshake();
79   }
80
81  private:
82   QuicSession* session_;
83 };
84
85 QuicSession::QuicSession(QuicConnection* connection,
86                          const QuicConfig& config)
87     : connection_(connection),
88       visitor_shim_(new VisitorShim(this)),
89       config_(config),
90       max_open_streams_(config_.max_streams_per_connection()),
91       next_stream_id_(is_server() ? 2 : 3),
92       largest_peer_created_stream_id_(0),
93       error_(QUIC_NO_ERROR),
94       goaway_received_(false),
95       goaway_sent_(false),
96       has_pending_handshake_(false) {
97
98   connection_->set_visitor(visitor_shim_.get());
99   connection_->SetFromConfig(config_);
100   if (connection_->connected()) {
101     connection_->SetOverallConnectionTimeout(
102         config_.max_time_before_crypto_handshake());
103   }
104   if (connection_->version() > QUIC_VERSION_12) {
105     headers_stream_.reset(new QuicHeadersStream(this));
106     if (!is_server()) {
107       // For version above QUIC v12, the headers stream is stream 3, so the
108       // next available local stream ID should be 5.
109       DCHECK_EQ(kHeadersStreamId, next_stream_id_);
110       next_stream_id_ += 2;
111     }
112   }
113 }
114
115 QuicSession::~QuicSession() {
116   STLDeleteElements(&closed_streams_);
117   STLDeleteValues(&stream_map_);
118 }
119
120 bool QuicSession::OnStreamFrames(const vector<QuicStreamFrame>& frames) {
121   for (size_t i = 0; i < frames.size(); ++i) {
122     // TODO(rch) deal with the error case of stream id 0
123     if (IsClosedStream(frames[i].stream_id)) {
124       // If we get additional frames for a stream where we didn't process
125       // headers, it's highly likely our compression context will end up
126       // permanently out of sync with the peer's, so we give up and close the
127       // connection.
128       if (ContainsKey(prematurely_closed_streams_, frames[i].stream_id)) {
129         connection()->SendConnectionClose(
130             QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
131         return false;
132       }
133       continue;
134     }
135
136     ReliableQuicStream* stream = GetStream(frames[i].stream_id);
137     if (stream == NULL) return false;
138     if (!stream->WillAcceptStreamFrame(frames[i])) return false;
139
140     // TODO(alyssar) check against existing connection address: if changed, make
141     // sure we update the connection.
142   }
143
144   for (size_t i = 0; i < frames.size(); ++i) {
145     QuicStreamId stream_id = frames[i].stream_id;
146     ReliableQuicStream* stream = GetStream(stream_id);
147     if (!stream) {
148       continue;
149     }
150     stream->OnStreamFrame(frames[i]);
151
152     // If the stream is a data stream had been prematurely closed, and the
153     // headers are now decompressed, then we are finally finished
154     // with this stream.
155     if (ContainsKey(zombie_streams_, stream_id) &&
156         static_cast<QuicDataStream*>(stream)->headers_decompressed()) {
157       CloseZombieStream(stream_id);
158     }
159   }
160
161   while (!decompression_blocked_streams_.empty()) {
162     QuicHeaderId header_id = decompression_blocked_streams_.begin()->first;
163     if (header_id != decompressor_.current_header_id()) {
164       break;
165     }
166     QuicStreamId stream_id = decompression_blocked_streams_.begin()->second;
167     decompression_blocked_streams_.erase(header_id);
168     QuicDataStream* stream = GetDataStream(stream_id);
169     if (!stream) {
170       connection()->SendConnectionClose(
171           QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
172       return false;
173     }
174     stream->OnDecompressorAvailable();
175   }
176   return true;
177 }
178
179 void QuicSession::OnStreamHeaders(QuicStreamId stream_id,
180                                   StringPiece headers_data) {
181   QuicDataStream* stream = GetDataStream(stream_id);
182   if (!stream) {
183     // It's quite possible to receive headers after a stream has been reset.
184     return;
185   }
186   stream->OnStreamHeaders(headers_data);
187 }
188
189 void QuicSession::OnStreamHeadersPriority(QuicStreamId stream_id,
190                                           QuicPriority priority) {
191   QuicDataStream* stream = GetDataStream(stream_id);
192   if (!stream) {
193     // It's quite possible to receive headers after a stream has been reset.
194     return;
195   }
196   stream->OnStreamHeadersPriority(priority);
197 }
198
199 void QuicSession::OnStreamHeadersComplete(QuicStreamId stream_id,
200                                           bool fin,
201                                           size_t frame_len) {
202   QuicDataStream* stream = GetDataStream(stream_id);
203   if (!stream) {
204     // It's quite possible to receive headers after a stream has been reset.
205     return;
206   }
207   stream->OnStreamHeadersComplete(fin, frame_len);
208 }
209
210 void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
211   if (frame.stream_id == kCryptoStreamId) {
212     connection()->SendConnectionCloseWithDetails(
213         QUIC_INVALID_STREAM_ID,
214         "Attempt to reset the crypto stream");
215     return;
216   }
217   if (frame.stream_id == kHeadersStreamId &&
218       connection()->version() > QUIC_VERSION_12) {
219     connection()->SendConnectionCloseWithDetails(
220         QUIC_INVALID_STREAM_ID,
221         "Attempt to reset the headers stream");
222     return;
223   }
224   QuicDataStream* stream = GetDataStream(frame.stream_id);
225   if (!stream) {
226     return;  // Errors are handled by GetStream.
227   }
228   if (ContainsKey(zombie_streams_, stream->id())) {
229     // If this was a zombie stream then we close it out now.
230     CloseZombieStream(stream->id());
231     // However, since the headers still have not been decompressed, we want to
232     // mark it a prematurely closed so that if we ever receive frames
233     // for this stream we can close the connection.
234     DCHECK(!stream->headers_decompressed());
235     AddPrematurelyClosedStream(frame.stream_id);
236     return;
237   }
238   if (connection()->version() <= QUIC_VERSION_12) {
239     if (stream->stream_bytes_read() > 0 && !stream->headers_decompressed()) {
240       connection()->SendConnectionClose(
241           QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
242     }
243   }
244   stream->OnStreamReset(frame.error_code);
245 }
246
247 void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) {
248   DCHECK(frame.last_good_stream_id < next_stream_id_);
249   goaway_received_ = true;
250 }
251
252 void QuicSession::OnConnectionClosed(QuicErrorCode error, bool from_peer) {
253   DCHECK(!connection_->connected());
254   if (error_ == QUIC_NO_ERROR) {
255     error_ = error;
256   }
257
258   while (!stream_map_.empty()) {
259     DataStreamMap::iterator it = stream_map_.begin();
260     QuicStreamId id = it->first;
261     it->second->OnConnectionClosed(error, from_peer);
262     // The stream should call CloseStream as part of OnConnectionClosed.
263     if (stream_map_.find(id) != stream_map_.end()) {
264       LOG(DFATAL) << ENDPOINT
265                   << "Stream failed to close under OnConnectionClosed";
266       CloseStream(id);
267     }
268   }
269 }
270
271 bool QuicSession::OnCanWrite() {
272   // We latch this here rather than doing a traditional loop, because streams
273   // may be modifying the list as we loop.
274   int remaining_writes = write_blocked_streams_.NumBlockedStreams();
275
276   while (remaining_writes > 0 && connection_->CanWriteStreamData()) {
277     DCHECK(write_blocked_streams_.HasWriteBlockedStreams());
278     if (!write_blocked_streams_.HasWriteBlockedStreams()) {
279       LOG(DFATAL) << "WriteBlockedStream is missing";
280       connection_->CloseConnection(QUIC_INTERNAL_ERROR, false);
281       return true;  // We have no write blocked streams.
282     }
283     QuicStreamId stream_id = write_blocked_streams_.PopFront();
284     if (stream_id == kCryptoStreamId) {
285       has_pending_handshake_ = false;  // We just popped it.
286     }
287     ReliableQuicStream* stream = GetStream(stream_id);
288     if (stream != NULL) {
289       // If the stream can't write all bytes, it'll re-add itself to the blocked
290       // list.
291       stream->OnCanWrite();
292     }
293     --remaining_writes;
294   }
295
296   return !write_blocked_streams_.HasWriteBlockedStreams();
297 }
298
299 bool QuicSession::HasPendingHandshake() const {
300   return has_pending_handshake_;
301 }
302
303 QuicConsumedData QuicSession::WritevData(
304     QuicStreamId id,
305     const struct iovec* iov,
306     int iov_count,
307     QuicStreamOffset offset,
308     bool fin,
309     QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
310   IOVector data;
311   data.AppendIovec(iov, iov_count);
312   return connection_->SendStreamData(id, data, offset, fin,
313                                      ack_notifier_delegate);
314 }
315
316 size_t QuicSession::WriteHeaders(QuicStreamId id,
317                                const SpdyHeaderBlock& headers,
318                                bool fin) {
319   DCHECK_LT(QUIC_VERSION_12, connection()->version());
320   if (connection()->version() <= QUIC_VERSION_12) {
321     return 0;
322   }
323   return headers_stream_->WriteHeaders(id, headers, fin);
324 }
325
326 void QuicSession::SendRstStream(QuicStreamId id,
327                                 QuicRstStreamErrorCode error) {
328   connection_->SendRstStream(id, error);
329   CloseStreamInner(id, true);
330 }
331
332 void QuicSession::SendGoAway(QuicErrorCode error_code, const string& reason) {
333   goaway_sent_ = true;
334   connection_->SendGoAway(error_code, largest_peer_created_stream_id_, reason);
335 }
336
337 void QuicSession::CloseStream(QuicStreamId stream_id) {
338   CloseStreamInner(stream_id, false);
339 }
340
341 void QuicSession::CloseStreamInner(QuicStreamId stream_id,
342                                    bool locally_reset) {
343   DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
344
345   DataStreamMap::iterator it = stream_map_.find(stream_id);
346   if (it == stream_map_.end()) {
347     DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id;
348     return;
349   }
350   QuicDataStream* stream = it->second;
351   if (connection_->version() <= QUIC_VERSION_12 &&
352       connection_->connected() && !stream->headers_decompressed()) {
353     // If the stream is being closed locally (for example a client cancelling
354     // a request before receiving the response) then we need to make sure that
355     // we keep the stream alive long enough to process any response or
356     // RST_STREAM frames.
357     if (locally_reset && !is_server()) {
358       AddZombieStream(stream_id);
359       return;
360     }
361
362     // This stream has been closed before the headers were decompressed.
363     // This might cause problems with head of line blocking of headers.
364     // If the peer sent headers which were lost but we now close the stream
365     // we will never be able to decompress headers for other streams.
366     // To deal with this, we keep track of streams which have been closed
367     // prematurely.  If we ever receive data frames for this steam, then we
368     // know there actually has been a problem and we close the connection.
369     AddPrematurelyClosedStream(stream->id());
370   }
371   closed_streams_.push_back(it->second);
372   if (ContainsKey(zombie_streams_, stream->id())) {
373     zombie_streams_.erase(stream->id());
374   }
375   stream_map_.erase(it);
376   stream->OnClose();
377 }
378
379 void QuicSession::AddZombieStream(QuicStreamId stream_id) {
380   if (zombie_streams_.size() == kMaxZombieStreams) {
381     QuicStreamId oldest_zombie_stream_id = zombie_streams_.begin()->first;
382     CloseZombieStream(oldest_zombie_stream_id);
383     // However, since the headers still have not been decompressed, we want to
384     // mark it a prematurely closed so that if we ever receive frames
385     // for this stream we can close the connection.
386     AddPrematurelyClosedStream(oldest_zombie_stream_id);
387   }
388   zombie_streams_.insert(make_pair(stream_id, true));
389 }
390
391 void QuicSession::CloseZombieStream(QuicStreamId stream_id) {
392   DCHECK(ContainsKey(zombie_streams_, stream_id));
393   zombie_streams_.erase(stream_id);
394   QuicDataStream* stream = GetDataStream(stream_id);
395   if (!stream) {
396     return;
397   }
398   stream_map_.erase(stream_id);
399   stream->OnClose();
400   closed_streams_.push_back(stream);
401 }
402
403 void QuicSession::AddPrematurelyClosedStream(QuicStreamId stream_id) {
404   if (connection()->version() > QUIC_VERSION_12) {
405     return;
406   }
407   if (prematurely_closed_streams_.size() ==
408       kMaxPrematurelyClosedStreamsTracked) {
409     prematurely_closed_streams_.erase(prematurely_closed_streams_.begin());
410   }
411   prematurely_closed_streams_.insert(make_pair(stream_id, true));
412 }
413
414 bool QuicSession::IsEncryptionEstablished() {
415   return GetCryptoStream()->encryption_established();
416 }
417
418 bool QuicSession::IsCryptoHandshakeConfirmed() {
419   return GetCryptoStream()->handshake_confirmed();
420 }
421
422 void QuicSession::OnConfigNegotiated() {
423   connection_->SetFromConfig(config_);
424 }
425
426 void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) {
427   switch (event) {
428     // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter
429     // to QuicSession since it is the glue.
430     case ENCRYPTION_FIRST_ESTABLISHED:
431       break;
432
433     case ENCRYPTION_REESTABLISHED:
434       // Retransmit originally packets that were sent, since they can't be
435       // decrypted by the peer.
436       connection_->RetransmitUnackedPackets(INITIAL_ENCRYPTION_ONLY);
437       break;
438
439     case HANDSHAKE_CONFIRMED:
440       LOG_IF(DFATAL, !config_.negotiated()) << ENDPOINT
441           << "Handshake confirmed without parameter negotiation.";
442       connection_->SetOverallConnectionTimeout(QuicTime::Delta::Infinite());
443       max_open_streams_ = config_.max_streams_per_connection();
444       break;
445
446     default:
447       LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event;
448   }
449 }
450
451 void QuicSession::OnCryptoHandshakeMessageSent(
452     const CryptoHandshakeMessage& message) {
453 }
454
455 void QuicSession::OnCryptoHandshakeMessageReceived(
456     const CryptoHandshakeMessage& message) {
457 }
458
459 QuicConfig* QuicSession::config() {
460   return &config_;
461 }
462
463 void QuicSession::ActivateStream(QuicDataStream* stream) {
464   DVLOG(1) << ENDPOINT << "num_streams: " << stream_map_.size()
465              << ". activating " << stream->id();
466   DCHECK_EQ(stream_map_.count(stream->id()), 0u);
467   stream_map_[stream->id()] = stream;
468 }
469
470 QuicStreamId QuicSession::GetNextStreamId() {
471   QuicStreamId id = next_stream_id_;
472   next_stream_id_ += 2;
473   return id;
474 }
475
476 ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) {
477   if (stream_id == kCryptoStreamId) {
478     return GetCryptoStream();
479   }
480   if (stream_id == kHeadersStreamId &&
481       connection_->version() > QUIC_VERSION_12) {
482     return headers_stream_.get();
483   }
484   return GetDataStream(stream_id);
485 }
486
487 QuicDataStream* QuicSession::GetDataStream(const QuicStreamId stream_id) {
488   if (stream_id == kCryptoStreamId) {
489     DLOG(FATAL) << "Attempt to call GetDataStream with the crypto stream id";
490     return NULL;
491   }
492   if (stream_id == kHeadersStreamId &&
493       connection_->version() > QUIC_VERSION_12) {
494     DLOG(FATAL) << "Attempt to call GetDataStream with the headers stream id";
495     return NULL;
496   }
497
498   DataStreamMap::iterator it = stream_map_.find(stream_id);
499   if (it != stream_map_.end()) {
500     return it->second;
501   }
502
503   if (IsClosedStream(stream_id)) {
504     return NULL;
505   }
506
507   if (stream_id % 2 == next_stream_id_ % 2) {
508     // We've received a frame for a locally-created stream that is not
509     // currently active.  This is an error.
510     if (connection()->connected()) {
511       connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM);
512     }
513     return NULL;
514   }
515
516   return GetIncomingDataStream(stream_id);
517 }
518
519 QuicDataStream* QuicSession::GetIncomingDataStream(QuicStreamId stream_id) {
520   if (IsClosedStream(stream_id)) {
521     return NULL;
522   }
523
524   if (goaway_sent_) {
525     // We've already sent a GoAway
526     SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY);
527     return NULL;
528   }
529
530   implicitly_created_streams_.erase(stream_id);
531   if (stream_id > largest_peer_created_stream_id_) {
532     // TODO(rch) add unit test for this
533     if (stream_id - largest_peer_created_stream_id_ > kMaxStreamIdDelta) {
534       connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID);
535       return NULL;
536     }
537     if (largest_peer_created_stream_id_ == 0) {
538       if (is_server() && connection()->version() > QUIC_VERSION_12) {
539         largest_peer_created_stream_id_= 3;
540       } else {
541         largest_peer_created_stream_id_= 1;
542       }
543     }
544     for (QuicStreamId id = largest_peer_created_stream_id_ + 2;
545          id < stream_id;
546          id += 2) {
547       implicitly_created_streams_.insert(id);
548     }
549     largest_peer_created_stream_id_ = stream_id;
550   }
551   QuicDataStream* stream = CreateIncomingDataStream(stream_id);
552   if (stream == NULL) {
553     return NULL;
554   }
555   ActivateStream(stream);
556   return stream;
557 }
558
559 bool QuicSession::IsClosedStream(QuicStreamId id) {
560   DCHECK_NE(0u, id);
561   if (id == kCryptoStreamId) {
562     return false;
563   }
564   if (connection()->version() > QUIC_VERSION_12) {
565     if (id == kHeadersStreamId) {
566       return false;
567     }
568   }
569   if (ContainsKey(zombie_streams_, id)) {
570     return true;
571   }
572   if (ContainsKey(stream_map_, id)) {
573     // Stream is active
574     return false;
575   }
576   if (id % 2 == next_stream_id_ % 2) {
577     // Locally created streams are strictly in-order.  If the id is in the
578     // range of created streams and it's not active, it must have been closed.
579     return id < next_stream_id_;
580   }
581   // For peer created streams, we also need to consider implicitly created
582   // streams.
583   return id <= largest_peer_created_stream_id_ &&
584       implicitly_created_streams_.count(id) == 0;
585 }
586
587 size_t QuicSession::GetNumOpenStreams() const {
588   return stream_map_.size() + implicitly_created_streams_.size() -
589       zombie_streams_.size();
590 }
591
592 void QuicSession::MarkWriteBlocked(QuicStreamId id, QuicPriority priority) {
593 #ifndef NDEBUG
594   ReliableQuicStream* stream = GetStream(id);
595   if (stream != NULL) {
596     LOG_IF(DFATAL, priority != stream->EffectivePriority())
597         << "Priorities do not match.  Got: " << priority
598         << " Expected: " << stream->EffectivePriority();
599   } else {
600     LOG(DFATAL) << "Marking unknown stream " << id << " blocked.";
601   }
602 #endif
603
604   if (id == kCryptoStreamId) {
605     DCHECK(!has_pending_handshake_);
606     has_pending_handshake_ = true;
607     // TODO(jar): Be sure to use the highest priority for the crypto stream,
608     // perhaps by adding a "special" priority for it that is higher than
609     // kHighestPriority.
610     priority = kHighestPriority;
611   }
612   write_blocked_streams_.PushBack(id, priority, connection()->version());
613 }
614
615 bool QuicSession::HasDataToWrite() const {
616   return write_blocked_streams_.HasWriteBlockedStreams() ||
617       connection_->HasQueuedData();
618 }
619
620 void QuicSession::MarkDecompressionBlocked(QuicHeaderId header_id,
621                                            QuicStreamId stream_id) {
622   DCHECK_GE(QUIC_VERSION_12, connection()->version());
623   decompression_blocked_streams_[header_id] = stream_id;
624 }
625
626 bool QuicSession::GetSSLInfo(SSLInfo* ssl_info) {
627   NOTIMPLEMENTED();
628   return false;
629 }
630
631 void QuicSession::PostProcessAfterData() {
632   STLDeleteElements(&closed_streams_);
633   closed_streams_.clear();
634 }
635
636 }  // namespace net