1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/reliable_quic_stream.h"
7 #include "base/logging.h"
8 #include "net/quic/quic_session.h"
9 #include "net/quic/quic_spdy_decompressor.h"
10 #include "net/quic/quic_write_blocked_list.h"
12 using base::StringPiece;
17 #define ENDPOINT (is_server_ ? "Server: " : " Client: ")
21 struct iovec MakeIovec(StringPiece data) {
22 struct iovec iov = {const_cast<char*>(data.data()),
23 static_cast<size_t>(data.size())};
29 ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
34 stream_bytes_read_(0),
35 stream_bytes_written_(0),
36 stream_error_(QUIC_STREAM_NO_ERROR),
37 connection_error_(QUIC_NO_ERROR),
38 read_side_closed_(false),
39 write_side_closed_(false),
43 is_server_(session_->is_server()) {
46 ReliableQuicStream::~ReliableQuicStream() {
49 bool ReliableQuicStream::WillAcceptStreamFrame(
50 const QuicStreamFrame& frame) const {
51 if (read_side_closed_) {
54 if (frame.stream_id != id_) {
55 LOG(ERROR) << "Error!";
58 return sequencer_.WillAcceptStreamFrame(frame);
61 bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
62 DCHECK_EQ(frame.stream_id, id_);
63 if (read_side_closed_) {
64 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id;
65 // We don't want to be reading: blackhole the data.
68 // Note: This count include duplicate data received.
69 stream_bytes_read_ += frame.data.TotalBufferSize();
71 bool accepted = sequencer_.OnStreamFrame(frame);
76 void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
77 stream_error_ = frame.error_code;
82 void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error,
84 if (read_side_closed_ && write_side_closed_) {
87 if (error != QUIC_NO_ERROR) {
88 stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
89 connection_error_ = error;
96 void ReliableQuicStream::OnFinRead() {
97 DCHECK(sequencer_.IsClosed());
101 void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) {
102 DCHECK_NE(QUIC_STREAM_NO_ERROR, error);
103 stream_error_ = error;
104 // Sending a RstStream results in calling CloseStream.
105 session()->SendRstStream(id(), error, stream_bytes_written_);
109 void ReliableQuicStream::CloseConnection(QuicErrorCode error) {
110 session()->connection()->SendConnectionClose(error);
113 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error,
114 const string& details) {
115 session()->connection()->SendConnectionCloseWithDetails(error, details);
118 QuicVersion ReliableQuicStream::version() {
119 return session()->connection()->version();
122 void ReliableQuicStream::WriteOrBufferData(StringPiece data, bool fin) {
123 if (data.empty() && !fin) {
124 LOG(DFATAL) << "data.empty() && !fin";
129 LOG(DFATAL) << "Fin already buffered";
133 QuicConsumedData consumed_data(0, false);
136 if (queued_data_.empty()) {
137 struct iovec iov(MakeIovec(data));
138 consumed_data = WritevData(&iov, 1, fin, NULL);
139 DCHECK_LE(consumed_data.bytes_consumed, data.length());
142 // If there's unconsumed data or an unconsumed fin, queue it.
143 if (consumed_data.bytes_consumed < data.length() ||
144 (fin && !consumed_data.fin_consumed)) {
145 queued_data_.push_back(
146 string(data.data() + consumed_data.bytes_consumed,
147 data.length() - consumed_data.bytes_consumed));
151 void ReliableQuicStream::OnCanWrite() {
153 while (!queued_data_.empty()) {
154 const string& data = queued_data_.front();
155 if (queued_data_.size() == 1 && fin_buffered_) {
158 struct iovec iov(MakeIovec(data));
159 QuicConsumedData consumed_data = WritevData(&iov, 1, fin, NULL);
160 if (consumed_data.bytes_consumed == data.size() &&
161 fin == consumed_data.fin_consumed) {
162 queued_data_.pop_front();
164 queued_data_.front().erase(0, consumed_data.bytes_consumed);
170 QuicConsumedData ReliableQuicStream::WritevData(
171 const struct iovec* iov,
174 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
175 if (write_side_closed_) {
176 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed";
177 return QuicConsumedData(0, false);
180 size_t write_length = 0u;
181 for (int i = 0; i < iov_count; ++i) {
182 write_length += iov[i].iov_len;
184 QuicConsumedData consumed_data = session()->WritevData(
185 id(), iov, iov_count, stream_bytes_written_, fin, ack_notifier_delegate);
186 stream_bytes_written_ += consumed_data.bytes_consumed;
187 if (consumed_data.bytes_consumed == write_length) {
188 if (fin && consumed_data.fin_consumed) {
191 } else if (fin && !consumed_data.fin_consumed) {
192 session_->MarkWriteBlocked(id(), EffectivePriority());
195 session_->MarkWriteBlocked(id(), EffectivePriority());
197 return consumed_data;
200 void ReliableQuicStream::CloseReadSide() {
201 if (read_side_closed_) {
204 DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
206 read_side_closed_ = true;
207 if (write_side_closed_) {
208 DVLOG(1) << ENDPOINT << "Closing stream: " << id();
209 session_->CloseStream(id());
213 void ReliableQuicStream::CloseWriteSide() {
214 if (write_side_closed_) {
217 DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
219 write_side_closed_ = true;
220 if (read_side_closed_) {
221 DVLOG(1) << ENDPOINT << "Closing stream: " << id();
222 session_->CloseStream(id());
226 bool ReliableQuicStream::HasBufferedData() {
227 return !queued_data_.empty();
230 void ReliableQuicStream::OnClose() {
234 if (version() > QUIC_VERSION_13 &&
235 !fin_sent_ && !rst_sent_) {
236 // For flow control accounting, we must tell the peer how many bytes we have
237 // written on this stream before termination. Done here if needed, using a
239 DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id();
240 session_->SendRstStream(id(), QUIC_STREAM_NO_ERROR, stream_bytes_written_);