1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/quic_stream_sequencer.h"
10 #include "base/logging.h"
11 #include "base/metrics/sparse_histogram.h"
12 #include "net/quic/reliable_quic_stream.h"
15 using std::numeric_limits;
19 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream)
20 : stream_(quic_stream),
21 num_bytes_consumed_(0),
22 close_offset_(numeric_limits<QuicStreamOffset>::max()),
24 num_bytes_buffered_(0),
25 num_frames_received_(0),
26 num_duplicate_frames_received_(0) {
29 QuicStreamSequencer::~QuicStreamSequencer() {
32 bool QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) {
33 ++num_frames_received_;
34 if (IsDuplicate(frame)) {
35 ++num_duplicate_frames_received_;
36 // Silently ignore duplicates.
40 QuicStreamOffset byte_offset = frame.offset;
41 size_t data_len = frame.data.TotalBufferSize();
42 if (data_len == 0 && !frame.fin) {
43 // Stream frames must have data or a fin flag.
44 stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME,
45 "Empty stream frame without FIN set.");
50 CloseStreamAtOffset(frame.offset + data_len);
57 data.AppendIovec(frame.data.iovec(), frame.data.Size());
59 // If the frame has arrived in-order then we can process it immediately, only
60 // buffering if the stream is unable to process it.
61 if (!blocked_ && byte_offset == num_bytes_consumed_) {
62 DVLOG(1) << "Processing byte offset " << byte_offset;
63 size_t bytes_consumed = 0;
64 for (size_t i = 0; i < data.Size(); ++i) {
65 bytes_consumed += stream_->ProcessRawData(
66 static_cast<char*>(data.iovec()[i].iov_base),
67 data.iovec()[i].iov_len);
69 num_bytes_consumed_ += bytes_consumed;
70 stream_->AddBytesConsumed(bytes_consumed);
71 stream_->MaybeSendWindowUpdate();
73 if (MaybeCloseStream()) {
76 if (bytes_consumed > data_len) {
77 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);
79 } else if (bytes_consumed == data_len) {
80 FlushBufferedFrames();
81 return true; // it's safe to ack this frame.
83 // Set ourselves up to buffer what's left.
84 data_len -= bytes_consumed;
85 data.Consume(bytes_consumed);
86 byte_offset += bytes_consumed;
90 // Buffer any remaining data to be consumed by the stream when ready.
91 for (size_t i = 0; i < data.Size(); ++i) {
92 DVLOG(1) << "Buffering stream data at offset " << byte_offset;
93 const iovec& iov = data.iovec()[i];
94 frames_.insert(make_pair(
95 byte_offset, string(static_cast<char*>(iov.iov_base), iov.iov_len)));
96 byte_offset += iov.iov_len;
97 num_bytes_buffered_ += iov.iov_len;
98 stream_->AddBytesBuffered(iov.iov_len);
103 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) {
104 const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max();
106 // If we have a scheduled termination or close, any new offset should match
108 if (close_offset_ != kMaxOffset && offset != close_offset_) {
109 stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS);
113 close_offset_ = offset;
118 bool QuicStreamSequencer::MaybeCloseStream() {
119 if (!blocked_ && IsClosed()) {
120 DVLOG(1) << "Passing up termination, as we've processed "
121 << num_bytes_consumed_ << " of " << close_offset_
123 // Technically it's an error if num_bytes_consumed isn't exactly
124 // equal, but error handling seems silly at this point.
125 stream_->OnFinRead();
127 num_bytes_buffered_ = 0;
133 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) {
135 FrameMap::iterator it = frames_.begin();
137 QuicStreamOffset offset = num_bytes_consumed_;
138 while (it != frames_.end() && index < iov_len) {
139 if (it->first != offset) return index;
141 iov[index].iov_base = static_cast<void*>(
142 const_cast<char*>(it->second.data()));
143 iov[index].iov_len = it->second.size();
144 offset += it->second.size();
152 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) {
154 FrameMap::iterator it = frames_.begin();
155 size_t iov_index = 0;
156 size_t iov_offset = 0;
157 size_t frame_offset = 0;
158 size_t initial_bytes_consumed = num_bytes_consumed_;
160 while (iov_index < iov_len &&
161 it != frames_.end() &&
162 it->first == num_bytes_consumed_) {
163 int bytes_to_read = min(iov[iov_index].iov_len - iov_offset,
164 it->second.size() - frame_offset);
166 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base) + iov_offset;
168 it->second.data() + frame_offset, bytes_to_read);
169 frame_offset += bytes_to_read;
170 iov_offset += bytes_to_read;
172 if (iov[iov_index].iov_len == iov_offset) {
173 // We've filled this buffer.
177 if (it->second.size() == frame_offset) {
178 // We've copied this whole frame
179 RecordBytesConsumed(it->second.size());
181 it = frames_.begin();
185 // We've finished copying. If we have a partial frame, update it.
186 if (frame_offset != 0) {
187 frames_.insert(make_pair(it->first + frame_offset,
188 it->second.substr(frame_offset)));
189 frames_.erase(frames_.begin());
190 RecordBytesConsumed(frame_offset);
192 return num_bytes_consumed_ - initial_bytes_consumed;
195 bool QuicStreamSequencer::HasBytesToRead() const {
196 FrameMap::const_iterator it = frames_.begin();
198 return it != frames_.end() && it->first == num_bytes_consumed_;
201 bool QuicStreamSequencer::IsClosed() const {
202 return num_bytes_consumed_ >= close_offset_;
205 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame& frame) const {
206 // A frame is duplicate if the frame offset is smaller than our bytes consumed
207 // or we have stored the frame in our map.
208 // TODO(pwestin): Is it possible that a new frame contain more data even if
209 // the offset is the same?
210 return frame.offset < num_bytes_consumed_ ||
211 frames_.find(frame.offset) != frames_.end();
214 void QuicStreamSequencer::SetBlockedUntilFlush() {
218 void QuicStreamSequencer::FlushBufferedFrames() {
220 FrameMap::iterator it = frames_.find(num_bytes_consumed_);
221 while (it != frames_.end()) {
222 DVLOG(1) << "Flushing buffered packet at offset " << it->first;
223 string* data = &it->second;
224 size_t bytes_consumed = stream_->ProcessRawData(data->c_str(),
226 RecordBytesConsumed(bytes_consumed);
227 if (MaybeCloseStream()) {
230 if (bytes_consumed > data->size()) {
231 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); // Programming error
233 } else if (bytes_consumed == data->size()) {
235 it = frames_.find(num_bytes_consumed_);
237 string new_data = it->second.substr(bytes_consumed);
239 frames_.insert(make_pair(num_bytes_consumed_, new_data));
246 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) {
247 num_bytes_consumed_ += bytes_consumed;
248 num_bytes_buffered_ -= bytes_consumed;
250 stream_->AddBytesConsumed(bytes_consumed);
251 stream_->RemoveBytesBuffered(bytes_consumed);
252 stream_->MaybeSendWindowUpdate();