- add sources.
[platform/framework/web/crosswalk.git] / src / net / quic / quic_session.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/quic/quic_session.h"
6
7 #include "base/stl_util.h"
8 #include "net/quic/crypto/proof_verifier.h"
9 #include "net/quic/quic_connection.h"
10 #include "net/ssl/ssl_info.h"
11
12 using base::StringPiece;
13 using base::hash_map;
14 using base::hash_set;
15 using std::make_pair;
16 using std::vector;
17
18 namespace net {
19
20 const size_t kMaxPrematurelyClosedStreamsTracked = 20;
21 const size_t kMaxZombieStreams = 20;
22
23 #define ENDPOINT (is_server_ ? "Server: " : " Client: ")
24
25 // We want to make sure we delete any closed streams in a safe manner.
26 // To avoid deleting a stream in mid-operation, we have a simple shim between
27 // us and the stream, so we can delete any streams when we return from
28 // processing.
29 //
30 // We could just override the base methods, but this makes it easier to make
31 // sure we don't miss any.
32 class VisitorShim : public QuicConnectionVisitorInterface {
33  public:
34   explicit VisitorShim(QuicSession* session) : session_(session) {}
35
36   virtual bool OnStreamFrames(const vector<QuicStreamFrame>& frames) OVERRIDE {
37     bool accepted = session_->OnStreamFrames(frames);
38     session_->PostProcessAfterData();
39     return accepted;
40   }
41   virtual void OnRstStream(const QuicRstStreamFrame& frame) OVERRIDE {
42     session_->OnRstStream(frame);
43     session_->PostProcessAfterData();
44   }
45
46   virtual void OnGoAway(const QuicGoAwayFrame& frame) OVERRIDE {
47     session_->OnGoAway(frame);
48     session_->PostProcessAfterData();
49   }
50
51   virtual bool OnCanWrite() OVERRIDE {
52     bool rc = session_->OnCanWrite();
53     session_->PostProcessAfterData();
54     return rc;
55   }
56
57   virtual void OnSuccessfulVersionNegotiation(
58       const QuicVersion& version) OVERRIDE {
59     session_->OnSuccessfulVersionNegotiation(version);
60   }
61
62   virtual void OnConfigNegotiated() OVERRIDE {
63     session_->OnConfigNegotiated();
64   }
65
66   virtual void OnConnectionClosed(QuicErrorCode error,
67                                   bool from_peer) OVERRIDE {
68     session_->OnConnectionClosed(error, from_peer);
69     // The session will go away, so don't bother with cleanup.
70   }
71
72   virtual bool HasPendingHandshake() const OVERRIDE {
73     return session_->HasPendingHandshake();
74   }
75
76  private:
77   QuicSession* session_;
78 };
79
80 QuicSession::QuicSession(QuicConnection* connection,
81                          const QuicConfig& config,
82                          bool is_server)
83     : connection_(connection),
84       visitor_shim_(new VisitorShim(this)),
85       config_(config),
86       max_open_streams_(config_.max_streams_per_connection()),
87       next_stream_id_(is_server ? 2 : 3),
88       is_server_(is_server),
89       largest_peer_created_stream_id_(0),
90       error_(QUIC_NO_ERROR),
91       goaway_received_(false),
92       goaway_sent_(false),
93       has_pending_handshake_(false) {
94
95   connection_->set_visitor(visitor_shim_.get());
96   connection_->SetFromConfig(config_);
97   if (connection_->connected()) {
98     connection_->SetOverallConnectionTimeout(
99         config_.max_time_before_crypto_handshake());
100   }
101 }
102
103 QuicSession::~QuicSession() {
104   STLDeleteElements(&closed_streams_);
105   STLDeleteValues(&stream_map_);
106 }
107
108 bool QuicSession::OnStreamFrames(const vector<QuicStreamFrame>& frames) {
109   for (size_t i = 0; i < frames.size(); ++i) {
110     // TODO(rch) deal with the error case of stream id 0
111     if (IsClosedStream(frames[i].stream_id)) {
112       // If we get additional frames for a stream where we didn't process
113       // headers, it's highly likely our compression context will end up
114       // permanently out of sync with the peer's, so we give up and close the
115       // connection.
116       if (ContainsKey(prematurely_closed_streams_, frames[i].stream_id)) {
117         connection()->SendConnectionClose(
118             QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
119         return false;
120       }
121       continue;
122     }
123
124     ReliableQuicStream* stream = GetStream(frames[i].stream_id);
125     if (stream == NULL) return false;
126     if (!stream->WillAcceptStreamFrame(frames[i])) return false;
127
128     // TODO(alyssar) check against existing connection address: if changed, make
129     // sure we update the connection.
130   }
131
132   for (size_t i = 0; i < frames.size(); ++i) {
133     QuicStreamId stream_id = frames[i].stream_id;
134     ReliableQuicStream* stream = GetStream(stream_id);
135     if (!stream) {
136       continue;
137     }
138     stream->OnStreamFrame(frames[i]);
139
140     // If the stream had been prematurely closed, and the
141     // headers are now decompressed, then we are finally finished
142     // with this stream.
143     if (ContainsKey(zombie_streams_, stream_id) &&
144         stream->headers_decompressed()) {
145       CloseZombieStream(stream_id);
146     }
147   }
148
149   while (!decompression_blocked_streams_.empty()) {
150     QuicHeaderId header_id = decompression_blocked_streams_.begin()->first;
151     if (header_id != decompressor_.current_header_id()) {
152       break;
153     }
154     QuicStreamId stream_id = decompression_blocked_streams_.begin()->second;
155     decompression_blocked_streams_.erase(header_id);
156     ReliableQuicStream* stream = GetStream(stream_id);
157     if (!stream) {
158       connection()->SendConnectionClose(
159           QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
160       return false;
161     }
162     stream->OnDecompressorAvailable();
163   }
164   return true;
165 }
166
167 void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
168   ReliableQuicStream* stream = GetStream(frame.stream_id);
169   if (!stream) {
170     return;  // Errors are handled by GetStream.
171   }
172   if (ContainsKey(zombie_streams_, stream->id())) {
173     // If this was a zombie stream then we close it out now.
174     CloseZombieStream(stream->id());
175     // However, since the headers still have not been decompressed, we want to
176     // mark it a prematurely closed so that if we ever receive frames
177     // for this stream we can close the connection.
178     DCHECK(!stream->headers_decompressed());
179     AddPrematurelyClosedStream(frame.stream_id);
180     return;
181   }
182   stream->OnStreamReset(frame.error_code);
183 }
184
185 void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) {
186   DCHECK(frame.last_good_stream_id < next_stream_id_);
187   goaway_received_ = true;
188 }
189
190 void QuicSession::OnConnectionClosed(QuicErrorCode error, bool from_peer) {
191   DCHECK(!connection_->connected());
192   if (error_ == QUIC_NO_ERROR) {
193     error_ = error;
194   }
195
196   while (stream_map_.size() != 0) {
197     ReliableStreamMap::iterator it = stream_map_.begin();
198     QuicStreamId id = it->first;
199     it->second->OnConnectionClosed(error, from_peer);
200     // The stream should call CloseStream as part of OnConnectionClosed.
201     if (stream_map_.find(id) != stream_map_.end()) {
202       LOG(DFATAL) << ENDPOINT
203                   << "Stream failed to close under OnConnectionClosed";
204       CloseStream(id);
205     }
206   }
207 }
208
209 bool QuicSession::OnCanWrite() {
210   // We latch this here rather than doing a traditional loop, because streams
211   // may be modifying the list as we loop.
212   int remaining_writes = write_blocked_streams_.NumBlockedStreams();
213
214   while (!connection_->HasQueuedData() &&
215          remaining_writes > 0) {
216     DCHECK(write_blocked_streams_.HasWriteBlockedStreams());
217     int index = write_blocked_streams_.GetHighestPriorityWriteBlockedList();
218     if (index == -1) {
219       LOG(DFATAL) << "WriteBlockedStream is missing";
220       connection_->CloseConnection(QUIC_INTERNAL_ERROR, false);
221       return true;  // We have no write blocked streams.
222     }
223     QuicStreamId stream_id = write_blocked_streams_.PopFront(index);
224     if (stream_id == kCryptoStreamId) {
225       has_pending_handshake_ = false;  // We just popped it.
226     }
227     ReliableQuicStream* stream = GetStream(stream_id);
228     if (stream != NULL) {
229       // If the stream can't write all bytes, it'll re-add itself to the blocked
230       // list.
231       stream->OnCanWrite();
232     }
233     --remaining_writes;
234   }
235
236   return !write_blocked_streams_.HasWriteBlockedStreams();
237 }
238
239 bool QuicSession::HasPendingHandshake() const {
240   return has_pending_handshake_;
241 }
242
243 QuicConsumedData QuicSession::WritevData(QuicStreamId id,
244                                          const struct iovec* iov,
245                                          int iov_count,
246                                          QuicStreamOffset offset,
247                                          bool fin) {
248   IOVector data;
249   data.AppendIovec(iov, iov_count);
250   return connection_->SendStreamData(id, data, offset, fin);
251 }
252
253 void QuicSession::SendRstStream(QuicStreamId id,
254                                 QuicRstStreamErrorCode error) {
255   connection_->SendRstStream(id, error);
256   CloseStreamInner(id, true);
257 }
258
259 void QuicSession::SendGoAway(QuicErrorCode error_code, const string& reason) {
260   goaway_sent_ = true;
261   connection_->SendGoAway(error_code, largest_peer_created_stream_id_, reason);
262 }
263
264 void QuicSession::CloseStream(QuicStreamId stream_id) {
265   CloseStreamInner(stream_id, false);
266 }
267
268 void QuicSession::CloseStreamInner(QuicStreamId stream_id,
269                                    bool locally_reset) {
270   DLOG(INFO) << ENDPOINT << "Closing stream " << stream_id;
271
272   ReliableStreamMap::iterator it = stream_map_.find(stream_id);
273   if (it == stream_map_.end()) {
274     DLOG(INFO) << ENDPOINT << "Stream is already closed: " << stream_id;
275     return;
276   }
277   ReliableQuicStream* stream = it->second;
278   if (connection_->connected() && !stream->headers_decompressed()) {
279     // If the stream is being closed locally (for example a client cancelling
280     // a request before receiving the response) then we need to make sure that
281     // we keep the stream alive long enough to process any response or
282     // RST_STREAM frames.
283     if (locally_reset && !is_server_) {
284       AddZombieStream(stream_id);
285       return;
286     }
287
288     // This stream has been closed before the headers were decompressed.
289     // This might cause problems with head of line blocking of headers.
290     // If the peer sent headers which were lost but we now close the stream
291     // we will never be able to decompress headers for other streams.
292     // To deal with this, we keep track of streams which have been closed
293     // prematurely.  If we ever receive data frames for this steam, then we
294     // know there actually has been a problem and we close the connection.
295     AddPrematurelyClosedStream(stream->id());
296   }
297   closed_streams_.push_back(it->second);
298   if (ContainsKey(zombie_streams_, stream->id())) {
299     zombie_streams_.erase(stream->id());
300   }
301   stream_map_.erase(it);
302   stream->OnClose();
303 }
304
305 void QuicSession::AddZombieStream(QuicStreamId stream_id) {
306   if (zombie_streams_.size() == kMaxZombieStreams) {
307     QuicStreamId oldest_zombie_stream_id = zombie_streams_.begin()->first;
308     CloseZombieStream(oldest_zombie_stream_id);
309     // However, since the headers still have not been decompressed, we want to
310     // mark it a prematurely closed so that if we ever receive frames
311     // for this stream we can close the connection.
312     AddPrematurelyClosedStream(oldest_zombie_stream_id);
313   }
314   zombie_streams_.insert(make_pair(stream_id, true));
315 }
316
317 void QuicSession::CloseZombieStream(QuicStreamId stream_id) {
318   DCHECK(ContainsKey(zombie_streams_, stream_id));
319   zombie_streams_.erase(stream_id);
320   ReliableQuicStream* stream = GetStream(stream_id);
321   if (!stream) {
322     return;
323   }
324   stream_map_.erase(stream_id);
325   stream->OnClose();
326   closed_streams_.push_back(stream);
327 }
328
329 void QuicSession::AddPrematurelyClosedStream(QuicStreamId stream_id) {
330   if (prematurely_closed_streams_.size() ==
331       kMaxPrematurelyClosedStreamsTracked) {
332     prematurely_closed_streams_.erase(prematurely_closed_streams_.begin());
333   }
334   prematurely_closed_streams_.insert(make_pair(stream_id, true));
335 }
336
337 bool QuicSession::IsEncryptionEstablished() {
338   return GetCryptoStream()->encryption_established();
339 }
340
341 bool QuicSession::IsCryptoHandshakeConfirmed() {
342   return GetCryptoStream()->handshake_confirmed();
343 }
344
345 void QuicSession::OnConfigNegotiated() {
346   connection_->SetFromConfig(config_);
347 }
348
349 void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) {
350   switch (event) {
351     // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter
352     // to QuicSession since it is the glue.
353     case ENCRYPTION_FIRST_ESTABLISHED:
354       break;
355
356     case ENCRYPTION_REESTABLISHED:
357       // Retransmit originally packets that were sent, since they can't be
358       // decrypted by the peer.
359       connection_->RetransmitUnackedPackets(
360           QuicConnection::INITIAL_ENCRYPTION_ONLY);
361       break;
362
363     case HANDSHAKE_CONFIRMED:
364       LOG_IF(DFATAL, !config_.negotiated()) << ENDPOINT
365           << "Handshake confirmed without parameter negotiation.";
366       connection_->SetOverallConnectionTimeout(QuicTime::Delta::Infinite());
367       max_open_streams_ = config_.max_streams_per_connection();
368       break;
369
370     default:
371       LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event;
372   }
373 }
374
375 void QuicSession::OnCryptoHandshakeMessageSent(
376     const CryptoHandshakeMessage& message) {
377 }
378
379 void QuicSession::OnCryptoHandshakeMessageReceived(
380     const CryptoHandshakeMessage& message) {
381 }
382
383 QuicConfig* QuicSession::config() {
384   return &config_;
385 }
386
387 void QuicSession::ActivateStream(ReliableQuicStream* stream) {
388   DLOG(INFO) << ENDPOINT << "num_streams: " << stream_map_.size()
389              << ". activating " << stream->id();
390   DCHECK_EQ(stream_map_.count(stream->id()), 0u);
391   stream_map_[stream->id()] = stream;
392 }
393
394 QuicStreamId QuicSession::GetNextStreamId() {
395   QuicStreamId id = next_stream_id_;
396   next_stream_id_ += 2;
397   return id;
398 }
399
400 ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) {
401   if (stream_id == kCryptoStreamId) {
402     return GetCryptoStream();
403   }
404
405   ReliableStreamMap::iterator it = stream_map_.find(stream_id);
406   if (it != stream_map_.end()) {
407     return it->second;
408   }
409
410   if (IsClosedStream(stream_id)) {
411     return NULL;
412   }
413
414   if (stream_id % 2 == next_stream_id_ % 2) {
415     // We've received a frame for a locally-created stream that is not
416     // currently active.  This is an error.
417     connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM);
418     return NULL;
419   }
420
421   return GetIncomingReliableStream(stream_id);
422 }
423
424 ReliableQuicStream* QuicSession::GetIncomingReliableStream(
425     QuicStreamId stream_id) {
426   if (IsClosedStream(stream_id)) {
427     return NULL;
428   }
429
430   if (goaway_sent_) {
431     // We've already sent a GoAway
432     SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY);
433     return NULL;
434   }
435
436   implicitly_created_streams_.erase(stream_id);
437   if (stream_id > largest_peer_created_stream_id_) {
438     // TODO(rch) add unit test for this
439     if (stream_id - largest_peer_created_stream_id_ > kMaxStreamIdDelta) {
440       connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID);
441       return NULL;
442     }
443     if (largest_peer_created_stream_id_ == 0) {
444       largest_peer_created_stream_id_= 1;
445     }
446     for (QuicStreamId id = largest_peer_created_stream_id_ + 2;
447          id < stream_id;
448          id += 2) {
449       implicitly_created_streams_.insert(id);
450     }
451     largest_peer_created_stream_id_ = stream_id;
452   }
453   ReliableQuicStream* stream = CreateIncomingReliableStream(stream_id);
454   if (stream == NULL) {
455     return NULL;
456   }
457   ActivateStream(stream);
458   return stream;
459 }
460
461 bool QuicSession::IsClosedStream(QuicStreamId id) {
462   DCHECK_NE(0u, id);
463   if (id == kCryptoStreamId) {
464     return false;
465   }
466   if (ContainsKey(zombie_streams_, id)) {
467     return true;
468   }
469   if (ContainsKey(stream_map_, id)) {
470     // Stream is active
471     return false;
472   }
473   if (id % 2 == next_stream_id_ % 2) {
474     // Locally created streams are strictly in-order.  If the id is in the
475     // range of created streams and it's not active, it must have been closed.
476     return id < next_stream_id_;
477   }
478   // For peer created streams, we also need to consider implicitly created
479   // streams.
480   return id <= largest_peer_created_stream_id_ &&
481       implicitly_created_streams_.count(id) == 0;
482 }
483
484 size_t QuicSession::GetNumOpenStreams() const {
485   return stream_map_.size() + implicitly_created_streams_.size() -
486       zombie_streams_.size();
487 }
488
489 void QuicSession::MarkWriteBlocked(QuicStreamId id, QuicPriority priority) {
490   if (id == kCryptoStreamId) {
491     DCHECK(!has_pending_handshake_);
492     has_pending_handshake_ = true;
493     // TODO(jar): Be sure to use the highest priority for the crypto stream,
494     // perhaps by adding a "special" priority for it that is higher than
495     // kHighestPriority.
496     priority = kHighestPriority;
497   }
498   write_blocked_streams_.PushBack(id, priority);
499 }
500
501 void QuicSession::MarkDecompressionBlocked(QuicHeaderId header_id,
502                                            QuicStreamId stream_id) {
503   decompression_blocked_streams_[header_id] = stream_id;
504 }
505
506 bool QuicSession::GetSSLInfo(SSLInfo* ssl_info) {
507   NOTIMPLEMENTED();
508   return false;
509 }
510
511 void QuicSession::PostProcessAfterData() {
512   STLDeleteElements(&closed_streams_);
513   closed_streams_.clear();
514 }
515
516 }  // namespace net