Upstream version 7.36.149.0
[platform/framework/web/crosswalk.git] / src / net / quic / quic_stream_sequencer.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/quic/quic_stream_sequencer.h"
6
7 #include <algorithm>
8 #include <limits>
9
10 #include "base/logging.h"
11 #include "base/metrics/sparse_histogram.h"
12 #include "net/quic/reliable_quic_stream.h"
13
14 using std::min;
15 using std::numeric_limits;
16
17 namespace net {
18
19 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream)
20     : stream_(quic_stream),
21       num_bytes_consumed_(0),
22       close_offset_(numeric_limits<QuicStreamOffset>::max()),
23       blocked_(false),
24       num_bytes_buffered_(0),
25       num_frames_received_(0),
26       num_duplicate_frames_received_(0) {
27 }
28
29 QuicStreamSequencer::~QuicStreamSequencer() {
30 }
31
32 bool QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) {
33   ++num_frames_received_;
34   if (IsDuplicate(frame)) {
35     ++num_duplicate_frames_received_;
36     // Silently ignore duplicates.
37     return true;
38   }
39
40   QuicStreamOffset byte_offset = frame.offset;
41   size_t data_len = frame.data.TotalBufferSize();
42   if (data_len == 0 && !frame.fin) {
43     // Stream frames must have data or a fin flag.
44     stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME,
45                                         "Empty stream frame without FIN set.");
46     return false;
47   }
48
49   if (frame.fin) {
50     CloseStreamAtOffset(frame.offset + data_len);
51     if (data_len == 0) {
52       return true;
53     }
54   }
55
56   IOVector data;
57   data.AppendIovec(frame.data.iovec(), frame.data.Size());
58
59   // If the frame has arrived in-order then we can process it immediately, only
60   // buffering if the stream is unable to process it.
61   if (!blocked_ && byte_offset == num_bytes_consumed_) {
62     DVLOG(1) << "Processing byte offset " << byte_offset;
63     size_t bytes_consumed = 0;
64     for (size_t i = 0; i < data.Size(); ++i) {
65       bytes_consumed += stream_->ProcessRawData(
66           static_cast<char*>(data.iovec()[i].iov_base),
67           data.iovec()[i].iov_len);
68     }
69     num_bytes_consumed_ += bytes_consumed;
70     stream_->AddBytesConsumed(bytes_consumed);
71     stream_->MaybeSendWindowUpdate();
72
73     if (MaybeCloseStream()) {
74       return true;
75     }
76     if (bytes_consumed > data_len) {
77       stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);
78       return false;
79     } else if (bytes_consumed == data_len) {
80       FlushBufferedFrames();
81       return true;  // it's safe to ack this frame.
82     } else {
83       // Set ourselves up to buffer what's left.
84       data_len -= bytes_consumed;
85       data.Consume(bytes_consumed);
86       byte_offset += bytes_consumed;
87     }
88   }
89
90   // Buffer any remaining data to be consumed by the stream when ready.
91   for (size_t i = 0; i < data.Size(); ++i) {
92     DVLOG(1) << "Buffering stream data at offset " << byte_offset;
93     const iovec& iov = data.iovec()[i];
94     frames_.insert(make_pair(
95         byte_offset, string(static_cast<char*>(iov.iov_base), iov.iov_len)));
96     byte_offset += iov.iov_len;
97     num_bytes_buffered_ += iov.iov_len;
98     stream_->AddBytesBuffered(iov.iov_len);
99   }
100   return true;
101 }
102
103 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) {
104   const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max();
105
106   // If we have a scheduled termination or close, any new offset should match
107   // it.
108   if (close_offset_ != kMaxOffset && offset != close_offset_) {
109     stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS);
110     return;
111   }
112
113   close_offset_ = offset;
114
115   MaybeCloseStream();
116 }
117
118 bool QuicStreamSequencer::MaybeCloseStream() {
119   if (!blocked_ && IsClosed()) {
120     DVLOG(1) << "Passing up termination, as we've processed "
121              << num_bytes_consumed_ << " of " << close_offset_
122              << " bytes.";
123     // Technically it's an error if num_bytes_consumed isn't exactly
124     // equal, but error handling seems silly at this point.
125     stream_->OnFinRead();
126     frames_.clear();
127     num_bytes_buffered_ = 0;
128     return true;
129   }
130   return false;
131 }
132
133 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) {
134   DCHECK(!blocked_);
135   FrameMap::iterator it = frames_.begin();
136   size_t index = 0;
137   QuicStreamOffset offset = num_bytes_consumed_;
138   while (it != frames_.end() && index < iov_len) {
139     if (it->first != offset) return index;
140
141     iov[index].iov_base = static_cast<void*>(
142         const_cast<char*>(it->second.data()));
143     iov[index].iov_len = it->second.size();
144     offset += it->second.size();
145
146     ++index;
147     ++it;
148   }
149   return index;
150 }
151
152 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) {
153   DCHECK(!blocked_);
154   FrameMap::iterator it = frames_.begin();
155   size_t iov_index = 0;
156   size_t iov_offset = 0;
157   size_t frame_offset = 0;
158   size_t initial_bytes_consumed = num_bytes_consumed_;
159
160   while (iov_index < iov_len &&
161          it != frames_.end() &&
162          it->first == num_bytes_consumed_) {
163     int bytes_to_read = min(iov[iov_index].iov_len - iov_offset,
164                             it->second.size() - frame_offset);
165
166     char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base) + iov_offset;
167     memcpy(iov_ptr,
168            it->second.data() + frame_offset, bytes_to_read);
169     frame_offset += bytes_to_read;
170     iov_offset += bytes_to_read;
171
172     if (iov[iov_index].iov_len == iov_offset) {
173       // We've filled this buffer.
174       iov_offset = 0;
175       ++iov_index;
176     }
177     if (it->second.size() == frame_offset) {
178       // We've copied this whole frame
179       RecordBytesConsumed(it->second.size());
180       frames_.erase(it);
181       it = frames_.begin();
182       frame_offset = 0;
183     }
184   }
185   // We've finished copying.  If we have a partial frame, update it.
186   if (frame_offset != 0) {
187     frames_.insert(make_pair(it->first + frame_offset,
188                              it->second.substr(frame_offset)));
189     frames_.erase(frames_.begin());
190     RecordBytesConsumed(frame_offset);
191   }
192   return num_bytes_consumed_ - initial_bytes_consumed;
193 }
194
195 bool QuicStreamSequencer::HasBytesToRead() const {
196   FrameMap::const_iterator it = frames_.begin();
197
198   return it != frames_.end() && it->first == num_bytes_consumed_;
199 }
200
201 bool QuicStreamSequencer::IsClosed() const {
202   return num_bytes_consumed_ >= close_offset_;
203 }
204
205 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame& frame) const {
206   // A frame is duplicate if the frame offset is smaller than our bytes consumed
207   // or we have stored the frame in our map.
208   // TODO(pwestin): Is it possible that a new frame contain more data even if
209   // the offset is the same?
210   return frame.offset < num_bytes_consumed_ ||
211       frames_.find(frame.offset) != frames_.end();
212 }
213
214 void QuicStreamSequencer::SetBlockedUntilFlush() {
215   blocked_ = true;
216 }
217
218 void QuicStreamSequencer::FlushBufferedFrames() {
219   blocked_ = false;
220   FrameMap::iterator it = frames_.find(num_bytes_consumed_);
221   while (it != frames_.end()) {
222     DVLOG(1) << "Flushing buffered packet at offset " << it->first;
223     string* data = &it->second;
224     size_t bytes_consumed = stream_->ProcessRawData(data->c_str(),
225                                                     data->size());
226     RecordBytesConsumed(bytes_consumed);
227     if (MaybeCloseStream()) {
228       return;
229     }
230     if (bytes_consumed > data->size()) {
231       stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);  // Programming error
232       return;
233     } else if (bytes_consumed == data->size()) {
234       frames_.erase(it);
235       it = frames_.find(num_bytes_consumed_);
236     } else {
237       string new_data = it->second.substr(bytes_consumed);
238       frames_.erase(it);
239       frames_.insert(make_pair(num_bytes_consumed_, new_data));
240       return;
241     }
242   }
243   MaybeCloseStream();
244 }
245
246 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) {
247   num_bytes_consumed_ += bytes_consumed;
248   num_bytes_buffered_ -= bytes_consumed;
249
250   stream_->AddBytesConsumed(bytes_consumed);
251   stream_->RemoveBytesBuffered(bytes_consumed);
252   stream_->MaybeSendWindowUpdate();
253 }
254
255 }  // namespace net