#include "net/quic/reliable_quic_stream.h"
+#include "base/logging.h"
+#include "net/quic/iovector.h"
+#include "net/quic/quic_flow_controller.h"
#include "net/quic/quic_session.h"
-#include "net/quic/quic_spdy_decompressor.h"
-#include "net/spdy/write_blocked_list.h"
+#include "net/quic/quic_write_blocked_list.h"
using base::StringPiece;
using std::min;
namespace net {
+#define ENDPOINT (is_server_ ? "Server: " : " Client: ")
+
namespace {
-// This is somewhat arbitrary. It's possible, but unlikely, we will either fail
-// to set a priority client-side, or cancel a stream before stripping the
-// priority from the wire server-side. In either case, start out with a
-// priority in the middle.
-QuicPriority kDefaultPriority = 3;
-
-// Appends bytes from data into partial_data_buffer. Once partial_data_buffer
-// reaches 4 bytes, copies the data into 'result' and clears
-// partial_data_buffer.
-// Returns the number of bytes consumed.
-uint32 StripUint32(const char* data, uint32 data_len,
- string* partial_data_buffer,
- uint32* result) {
- DCHECK_GT(4u, partial_data_buffer->length());
- size_t missing_size = 4 - partial_data_buffer->length();
- if (data_len < missing_size) {
- StringPiece(data, data_len).AppendToString(partial_data_buffer);
- return data_len;
- }
- StringPiece(data, missing_size).AppendToString(partial_data_buffer);
- DCHECK_EQ(4u, partial_data_buffer->length());
- memcpy(result, partial_data_buffer->data(), 4);
- partial_data_buffer->clear();
- return missing_size;
+struct iovec MakeIovec(StringPiece data) {
+ struct iovec iov = {const_cast<char*>(data.data()),
+ static_cast<size_t>(data.size())};
+ return iov;
}
} // namespace
-ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
- QuicSession* session)
+// Wrapper that aggregates OnAckNotifications for packets sent using
+// WriteOrBufferData and delivers them to the original
+// QuicAckNotifier::DelegateInterface after all bytes written using
+// WriteOrBufferData are acked. This level of indirection is
+// necessary because the delegate interface provides no mechanism that
+// WriteOrBufferData can use to inform it that the write required
+// multiple WritevData calls or that only part of the data has been
+// sent out by the time ACKs start arriving.
+class ReliableQuicStream::ProxyAckNotifierDelegate
+ : public QuicAckNotifier::DelegateInterface {
+ public:
+ explicit ProxyAckNotifierDelegate(DelegateInterface* delegate)
+ : delegate_(delegate),
+ pending_acks_(0),
+ wrote_last_data_(false),
+ num_original_packets_(0),
+ num_original_bytes_(0),
+ num_retransmitted_packets_(0),
+ num_retransmitted_bytes_(0) {
+ }
+
+ virtual void OnAckNotification(int num_original_packets,
+ int num_original_bytes,
+ int num_retransmitted_packets,
+ int num_retransmitted_bytes,
+ QuicTime::Delta delta_largest_observed)
+ OVERRIDE {
+ DCHECK_LT(0, pending_acks_);
+ --pending_acks_;
+ num_original_packets_ += num_original_packets;
+ num_original_bytes_ += num_original_bytes;
+ num_retransmitted_packets_ += num_retransmitted_packets;
+ num_retransmitted_bytes_ += num_retransmitted_bytes;
+
+ if (wrote_last_data_ && pending_acks_ == 0) {
+ delegate_->OnAckNotification(num_original_packets_,
+ num_original_bytes_,
+ num_retransmitted_packets_,
+ num_retransmitted_bytes_,
+ delta_largest_observed);
+ }
+ }
+
+ void WroteData(bool last_data) {
+ DCHECK(!wrote_last_data_);
+ ++pending_acks_;
+ wrote_last_data_ = last_data;
+ }
+
+ protected:
+ // Delegates are ref counted.
+ virtual ~ProxyAckNotifierDelegate() {
+ }
+
+ private:
+ // Original delegate. delegate_->OnAckNotification will be called when:
+ // wrote_last_data_ == true and pending_acks_ == 0
+ scoped_refptr<DelegateInterface> delegate_;
+
+ // Number of outstanding acks.
+ int pending_acks_;
+
+ // True if no pending writes remain.
+ bool wrote_last_data_;
+
+ // Accumulators.
+ int num_original_packets_;
+ int num_original_bytes_;
+ int num_retransmitted_packets_;
+ int num_retransmitted_bytes_;
+
+ DISALLOW_COPY_AND_ASSIGN(ProxyAckNotifierDelegate);
+};
+
+ReliableQuicStream::PendingData::PendingData(
+ string data_in, scoped_refptr<ProxyAckNotifierDelegate> delegate_in)
+ : data(data_in), delegate(delegate_in) {
+}
+
+ReliableQuicStream::PendingData::~PendingData() {
+}
+
+ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session)
: sequencer_(this),
id_(id),
session_(session),
- visitor_(NULL),
stream_bytes_read_(0),
stream_bytes_written_(0),
- headers_decompressed_(false),
- priority_(kDefaultPriority),
- headers_id_(0),
- decompression_failed_(false),
stream_error_(QUIC_STREAM_NO_ERROR),
connection_error_(QUIC_NO_ERROR),
read_side_closed_(false),
write_side_closed_(false),
- priority_parsed_(false),
fin_buffered_(false),
fin_sent_(false),
- is_server_(session_->is_server()) {
+ rst_sent_(false),
+ is_server_(session_->is_server()),
+ flow_controller_(
+ session_->connection()->version(),
+ id_,
+ is_server_,
+ session_->config()->HasReceivedInitialFlowControlWindowBytes() ?
+ session_->config()->ReceivedInitialFlowControlWindowBytes() :
+ kDefaultFlowControlSendWindow,
+ session_->connection()->max_flow_control_receive_window_bytes(),
+ session_->connection()->max_flow_control_receive_window_bytes()),
+ connection_flow_controller_(session_->connection()->flow_controller()) {
}
ReliableQuicStream::~ReliableQuicStream() {
}
-bool ReliableQuicStream::WillAcceptStreamFrame(
- const QuicStreamFrame& frame) const {
+bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
if (read_side_closed_) {
+ DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id;
+ // We don't want to be reading: blackhole the data.
return true;
}
+
if (frame.stream_id != id_) {
LOG(ERROR) << "Error!";
return false;
}
- return sequencer_.WillAcceptStreamFrame(frame);
-}
-bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
- DCHECK_EQ(frame.stream_id, id_);
- if (read_side_closed_) {
- DLOG(INFO) << ENDPOINT << "Ignoring frame " << frame.stream_id;
- // We don't want to be reading: blackhole the data.
- return true;
- }
- // Note: This count include duplicate data received.
- stream_bytes_read_ += frame.data.length();
+ // This count include duplicate data received.
+ stream_bytes_read_ += frame.data.TotalBufferSize();
bool accepted = sequencer_.OnStreamFrame(frame);
+ if (flow_controller_.FlowControlViolation() ||
+ connection_flow_controller_->FlowControlViolation()) {
+ session_->connection()->SendConnectionClose(QUIC_FLOW_CONTROL_ERROR);
+ return false;
+ }
+ MaybeSendWindowUpdate();
+
return accepted;
}
-void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) {
- stream_error_ = error;
- TerminateFromPeer(false); // Full close.
+void ReliableQuicStream::MaybeSendWindowUpdate() {
+ flow_controller_.MaybeSendWindowUpdate(session()->connection());
+ connection_flow_controller_->MaybeSendWindowUpdate(session()->connection());
+}
+
+int ReliableQuicStream::num_frames_received() const {
+ return sequencer_.num_frames_received();
+}
+
+int ReliableQuicStream::num_duplicate_frames_received() const {
+ return sequencer_.num_duplicate_frames_received();
+}
+
+void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
+ stream_error_ = frame.error_code;
+ CloseWriteSide();
+ CloseReadSide();
}
void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error,
connection_error_ = error;
}
- if (from_peer) {
- TerminateFromPeer(false);
- } else {
- CloseWriteSide();
- CloseReadSide();
- }
+ CloseWriteSide();
+ CloseReadSide();
}
-void ReliableQuicStream::TerminateFromPeer(bool half_close) {
- if (!half_close) {
- CloseWriteSide();
- }
+void ReliableQuicStream::OnFinRead() {
+ DCHECK(sequencer_.IsClosed());
CloseReadSide();
}
-void ReliableQuicStream::Close(QuicRstStreamErrorCode error) {
+void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) {
+ DCHECK_NE(QUIC_STREAM_NO_ERROR, error);
stream_error_ = error;
- if (error != QUIC_STREAM_NO_ERROR) {
- // Sending a RstStream results in calling CloseStream.
- session()->SendRstStream(id(), error);
- } else {
- session_->CloseStream(id());
- }
+ // Sending a RstStream results in calling CloseStream.
+ session()->SendRstStream(id(), error, stream_bytes_written_);
+ rst_sent_ = true;
}
void ReliableQuicStream::CloseConnection(QuicErrorCode error) {
session()->connection()->SendConnectionCloseWithDetails(error, details);
}
-size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) {
- if (headers_decompressed_ && decompressed_headers_.empty()) {
- return sequencer_.Readv(iov, iov_len);
- }
- size_t bytes_consumed = 0;
- size_t iov_index = 0;
- while (iov_index < iov_len &&
- decompressed_headers_.length() > bytes_consumed) {
- size_t bytes_to_read = min(iov[iov_index].iov_len,
- decompressed_headers_.length() - bytes_consumed);
- char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
- memcpy(iov_ptr,
- decompressed_headers_.data() + bytes_consumed, bytes_to_read);
- bytes_consumed += bytes_to_read;
- ++iov_index;
- }
- decompressed_headers_.erase(0, bytes_consumed);
- return bytes_consumed;
+QuicVersion ReliableQuicStream::version() const {
+ return session()->connection()->version();
}
-int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) {
- if (headers_decompressed_ && decompressed_headers_.empty()) {
- return sequencer_.GetReadableRegions(iov, iov_len);
- }
- if (iov_len == 0) {
- return 0;
+void ReliableQuicStream::WriteOrBufferData(
+ StringPiece data,
+ bool fin,
+ QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
+ if (data.empty() && !fin) {
+ LOG(DFATAL) << "data.empty() && !fin";
+ return;
}
- iov[0].iov_base = static_cast<void*>(
- const_cast<char*>(decompressed_headers_.data()));
- iov[0].iov_len = decompressed_headers_.length();
- return 1;
-}
-bool ReliableQuicStream::IsHalfClosed() const {
- if (!headers_decompressed_ || !decompressed_headers_.empty()) {
- return false;
+ if (fin_buffered_) {
+ LOG(DFATAL) << "Fin already buffered";
+ return;
}
- return sequencer_.IsHalfClosed();
-}
-
-bool ReliableQuicStream::HasBytesToRead() const {
- return !decompressed_headers_.empty() || sequencer_.HasBytesToRead();
-}
-
-const IPEndPoint& ReliableQuicStream::GetPeerAddress() const {
- return session_->peer_address();
-}
-
-QuicSpdyCompressor* ReliableQuicStream::compressor() {
- return session_->compressor();
-}
-
-bool ReliableQuicStream::GetSSLInfo(SSLInfo* ssl_info) {
- return session_->GetSSLInfo(ssl_info);
-}
-
-QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) {
- DCHECK(data.size() > 0 || fin);
- return WriteOrBuffer(data, fin);
-}
-
-void ReliableQuicStream::set_priority(QuicPriority priority) {
- DCHECK_EQ(0u, stream_bytes_written_);
- priority_ = priority;
-}
-
-QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) {
- DCHECK(!fin_buffered_);
+ scoped_refptr<ProxyAckNotifierDelegate> proxy_delegate;
+ if (ack_notifier_delegate != NULL) {
+ proxy_delegate = new ProxyAckNotifierDelegate(ack_notifier_delegate);
+ }
QuicConsumedData consumed_data(0, false);
fin_buffered_ = fin;
if (queued_data_.empty()) {
- consumed_data = WriteDataInternal(string(data.data(), data.length()), fin);
+ struct iovec iov(MakeIovec(data));
+ consumed_data = WritevData(&iov, 1, fin, proxy_delegate.get());
DCHECK_LE(consumed_data.bytes_consumed, data.length());
}
+ bool write_completed;
// If there's unconsumed data or an unconsumed fin, queue it.
if (consumed_data.bytes_consumed < data.length() ||
(fin && !consumed_data.fin_consumed)) {
- queued_data_.push_back(
- string(data.data() + consumed_data.bytes_consumed,
- data.length() - consumed_data.bytes_consumed));
+ StringPiece remainder(data.substr(consumed_data.bytes_consumed));
+ queued_data_.push_back(PendingData(remainder.as_string(), proxy_delegate));
+ write_completed = false;
+ } else {
+ write_completed = true;
}
- return QuicConsumedData(data.size(), true);
+ if ((proxy_delegate.get() != NULL) &&
+ (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed)) {
+ proxy_delegate->WroteData(write_completed);
+ }
}
void ReliableQuicStream::OnCanWrite() {
bool fin = false;
while (!queued_data_.empty()) {
- const string& data = queued_data_.front();
+ PendingData* pending_data = &queued_data_.front();
+ ProxyAckNotifierDelegate* delegate = pending_data->delegate.get();
if (queued_data_.size() == 1 && fin_buffered_) {
fin = true;
}
- QuicConsumedData consumed_data = WriteDataInternal(data, fin);
- if (consumed_data.bytes_consumed == data.size() &&
+ struct iovec iov(MakeIovec(pending_data->data));
+ QuicConsumedData consumed_data = WritevData(&iov, 1, fin, delegate);
+ if (consumed_data.bytes_consumed == pending_data->data.size() &&
fin == consumed_data.fin_consumed) {
queued_data_.pop_front();
+ if (delegate != NULL) {
+ delegate->WroteData(true);
+ }
} else {
- queued_data_.front().erase(0, consumed_data.bytes_consumed);
+ if (consumed_data.bytes_consumed > 0) {
+ pending_data->data.erase(0, consumed_data.bytes_consumed);
+ if (delegate != NULL) {
+ delegate->WroteData(false);
+ }
+ }
break;
}
}
}
-QuicConsumedData ReliableQuicStream::WriteDataInternal(
- StringPiece data, bool fin) {
- struct iovec iov = {const_cast<char*>(data.data()),
- static_cast<size_t>(data.size())};
- return WritevDataInternal(&iov, 1, fin);
+void ReliableQuicStream::MaybeSendBlocked() {
+ flow_controller_.MaybeSendBlocked(session()->connection());
+ connection_flow_controller_->MaybeSendBlocked(session()->connection());
+ // If we are connection level flow control blocked, then add the stream
+ // to the write blocked list. It will be given a chance to write when a
+ // connection level WINDOW_UPDATE arrives.
+ if (connection_flow_controller_->IsBlocked() &&
+ !flow_controller_.IsBlocked()) {
+ session_->MarkWriteBlocked(id(), EffectivePriority());
+ }
}
-QuicConsumedData ReliableQuicStream::WritevDataInternal(const struct iovec* iov,
- int iov_count,
- bool fin) {
+QuicConsumedData ReliableQuicStream::WritevData(
+ const struct iovec* iov,
+ int iov_count,
+ bool fin,
+ QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
if (write_side_closed_) {
DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed";
return QuicConsumedData(0, false);
}
- size_t write_length = 0u;
- for (int i = 0; i < iov_count; ++i) {
- write_length += iov[i].iov_len;
+ // How much data we want to write.
+ size_t write_length = TotalIovecLength(iov, iov_count);
+
+ // A FIN with zero data payload should not be flow control blocked.
+ bool fin_with_zero_data = (fin && write_length == 0);
+
+ if (flow_controller_.IsEnabled()) {
+ // How much data we are allowed to write from flow control.
+ uint64 send_window = flow_controller_.SendWindowSize();
+ if (connection_flow_controller_->IsEnabled()) {
+ send_window =
+ min(send_window, connection_flow_controller_->SendWindowSize());
+ }
+
+ if (send_window == 0 && !fin_with_zero_data) {
+ // Quick return if we can't send anything.
+ MaybeSendBlocked();
+ return QuicConsumedData(0, false);
+ }
+
+ if (write_length > send_window) {
+ // Don't send the FIN if we aren't going to send all the data.
+ fin = false;
+
+ // Writing more data would be a violation of flow control.
+ write_length = send_window;
+ }
}
- QuicConsumedData consumed_data =
- session()->WritevData(id(), iov, iov_count, stream_bytes_written_, fin);
+
+ // Fill an IOVector with bytes from the iovec.
+ IOVector data;
+ data.AppendIovecAtMostBytes(iov, iov_count, write_length);
+
+ QuicConsumedData consumed_data = session()->WritevData(
+ id(), data, stream_bytes_written_, fin, ack_notifier_delegate);
stream_bytes_written_ += consumed_data.bytes_consumed;
+
+ AddBytesSent(consumed_data.bytes_consumed);
+
if (consumed_data.bytes_consumed == write_length) {
+ if (!fin_with_zero_data) {
+ MaybeSendBlocked();
+ }
if (fin && consumed_data.fin_consumed) {
fin_sent_ = true;
CloseWriteSide();
return consumed_data;
}
-QuicPriority ReliableQuicStream::EffectivePriority() const {
- return priority();
-}
-
void ReliableQuicStream::CloseReadSide() {
if (read_side_closed_) {
return;
}
- DLOG(INFO) << ENDPOINT << "Done reading from stream " << id();
+ DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
read_side_closed_ = true;
if (write_side_closed_) {
- DLOG(INFO) << ENDPOINT << "Closing stream: " << id();
+ DVLOG(1) << ENDPOINT << "Closing stream: " << id();
session_->CloseStream(id());
}
}
-uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) {
- DCHECK_NE(0u, data_len);
- if (id() == kCryptoStreamId) {
- // The crypto stream does not use compression.
- return ProcessData(data, data_len);
- }
-
- uint32 total_bytes_consumed = 0;
- if (headers_id_ == 0u) {
- total_bytes_consumed += StripPriorityAndHeaderId(data, data_len);
- data += total_bytes_consumed;
- data_len -= total_bytes_consumed;
- if (data_len == 0 || !session_->connection()->connected()) {
- return total_bytes_consumed;
- }
- }
- DCHECK_NE(0u, headers_id_);
-
- // Once the headers are finished, we simply pass the data through.
- if (headers_decompressed_) {
- // Some buffered header data remains.
- if (!decompressed_headers_.empty()) {
- ProcessHeaderData();
- }
- if (decompressed_headers_.empty()) {
- DVLOG(1) << "Delegating procesing to ProcessData";
- total_bytes_consumed += ProcessData(data, data_len);
- }
- return total_bytes_consumed;
- }
-
- QuicHeaderId current_header_id =
- session_->decompressor()->current_header_id();
- // Ensure that this header id looks sane.
- if (headers_id_ < current_header_id ||
- headers_id_ > kMaxHeaderIdDelta + current_header_id) {
- DVLOG(1) << ENDPOINT
- << "Invalid headers for stream: " << id()
- << " header_id: " << headers_id_
- << " current_header_id: " << current_header_id;
- session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
- return total_bytes_consumed;
- }
-
- // If we are head-of-line blocked on decompression, then back up.
- if (current_header_id != headers_id_) {
- session_->MarkDecompressionBlocked(headers_id_, id());
- DVLOG(1) << ENDPOINT
- << "Unable to decompress header data for stream: " << id()
- << " header_id: " << headers_id_;
- return total_bytes_consumed;
- }
-
- // Decompressed data will be delivered to decompressed_headers_.
- size_t bytes_consumed = session_->decompressor()->DecompressData(
- StringPiece(data, data_len), this);
- DCHECK_NE(0u, bytes_consumed);
- if (bytes_consumed > data_len) {
- DCHECK(false) << "DecompressData returned illegal value";
- OnDecompressionError();
- return total_bytes_consumed;
- }
- total_bytes_consumed += bytes_consumed;
- data += bytes_consumed;
- data_len -= bytes_consumed;
-
- if (decompression_failed_) {
- // The session will have been closed in OnDecompressionError.
- return total_bytes_consumed;
- }
-
- // Headers are complete if the decompressor has moved on to the
- // next stream.
- headers_decompressed_ =
- session_->decompressor()->current_header_id() != headers_id_;
- if (!headers_decompressed_) {
- DCHECK_EQ(0u, data_len);
- }
-
- ProcessHeaderData();
-
- if (!headers_decompressed_ || !decompressed_headers_.empty()) {
- return total_bytes_consumed;
+void ReliableQuicStream::CloseWriteSide() {
+ if (write_side_closed_) {
+ return;
}
+ DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
- // We have processed all of the decompressed data but we might
- // have some more raw data to process.
- if (data_len > 0) {
- total_bytes_consumed += ProcessData(data, data_len);
+ write_side_closed_ = true;
+ if (read_side_closed_) {
+ DVLOG(1) << ENDPOINT << "Closing stream: " << id();
+ session_->CloseStream(id());
}
+}
- // The sequencer will push any additional buffered frames if this data
- // has been completely consumed.
- return total_bytes_consumed;
+bool ReliableQuicStream::HasBufferedData() const {
+ return !queued_data_.empty();
}
-uint32 ReliableQuicStream::ProcessHeaderData() {
- if (decompressed_headers_.empty()) {
- return 0;
- }
+void ReliableQuicStream::OnClose() {
+ CloseReadSide();
+ CloseWriteSide();
- size_t bytes_processed = ProcessData(decompressed_headers_.data(),
- decompressed_headers_.length());
- if (bytes_processed == decompressed_headers_.length()) {
- decompressed_headers_.clear();
- } else {
- decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
+ if (!fin_sent_ && !rst_sent_) {
+ // For flow control accounting, we must tell the peer how many bytes we have
+ // written on this stream before termination. Done here if needed, using a
+ // RST frame.
+ DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id();
+ session_->SendRstStream(id(), QUIC_RST_FLOW_CONTROL_ACCOUNTING,
+ stream_bytes_written_);
+ rst_sent_ = true;
}
- return bytes_processed;
}
-void ReliableQuicStream::OnDecompressorAvailable() {
- DCHECK_EQ(headers_id_,
- session_->decompressor()->current_header_id());
- DCHECK(!headers_decompressed_);
- DCHECK(!decompression_failed_);
- DCHECK_EQ(0u, decompressed_headers_.length());
-
- while (!headers_decompressed_) {
- struct iovec iovec;
- if (sequencer_.GetReadableRegions(&iovec, 1) == 0) {
- return;
- }
-
- size_t bytes_consumed = session_->decompressor()->DecompressData(
- StringPiece(static_cast<char*>(iovec.iov_base),
- iovec.iov_len),
- this);
- DCHECK_LE(bytes_consumed, iovec.iov_len);
- if (decompression_failed_) {
- return;
- }
- sequencer_.MarkConsumed(bytes_consumed);
-
- headers_decompressed_ =
- session_->decompressor()->current_header_id() != headers_id_;
+void ReliableQuicStream::OnWindowUpdateFrame(
+ const QuicWindowUpdateFrame& frame) {
+ if (!flow_controller_.IsEnabled()) {
+ DLOG(DFATAL) << "Flow control not enabled! " << version();
+ return;
}
- // Either the headers are complete, or the all data as been consumed.
- ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_.
- if (IsHalfClosed()) {
- TerminateFromPeer(true);
- } else if (headers_decompressed_ && decompressed_headers_.empty()) {
- sequencer_.FlushBufferedFrames();
+ if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) {
+ // We can write again!
+ // TODO(rjshade): This does not respect priorities (e.g. multiple
+ // outstanding POSTs are unblocked on arrival of
+ // SHLO with initial window).
+ // As long as the connection is not flow control blocked, we can write!
+ OnCanWrite();
}
}
-bool ReliableQuicStream::OnDecompressedData(StringPiece data) {
- data.AppendToString(&decompressed_headers_);
- return true;
-}
-
-void ReliableQuicStream::OnDecompressionError() {
- DCHECK(!decompression_failed_);
- decompression_failed_ = true;
- session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE);
-}
-
-
-void ReliableQuicStream::CloseWriteSide() {
- if (write_side_closed_) {
- return;
+void ReliableQuicStream::AddBytesBuffered(uint64 bytes) {
+ if (flow_controller_.IsEnabled()) {
+ flow_controller_.AddBytesBuffered(bytes);
+ connection_flow_controller_->AddBytesBuffered(bytes);
}
- DLOG(INFO) << ENDPOINT << "Done writing to stream " << id();
+}
- write_side_closed_ = true;
- if (read_side_closed_) {
- DLOG(INFO) << ENDPOINT << "Closing stream: " << id();
- session_->CloseStream(id());
+void ReliableQuicStream::RemoveBytesBuffered(uint64 bytes) {
+ if (flow_controller_.IsEnabled()) {
+ flow_controller_.RemoveBytesBuffered(bytes);
+ connection_flow_controller_->RemoveBytesBuffered(bytes);
}
}
-bool ReliableQuicStream::HasBufferedData() {
- return !queued_data_.empty();
+void ReliableQuicStream::AddBytesSent(uint64 bytes) {
+ if (flow_controller_.IsEnabled()) {
+ flow_controller_.AddBytesSent(bytes);
+ connection_flow_controller_->AddBytesSent(bytes);
+ }
}
-void ReliableQuicStream::OnClose() {
- CloseReadSide();
- CloseWriteSide();
-
- if (visitor_) {
- Visitor* visitor = visitor_;
- // Calling Visitor::OnClose() may result the destruction of the visitor,
- // so we need to ensure we don't call it again.
- visitor_ = NULL;
- visitor->OnClose(this);
+void ReliableQuicStream::AddBytesConsumed(uint64 bytes) {
+ if (flow_controller_.IsEnabled()) {
+ flow_controller_.AddBytesConsumed(bytes);
+ connection_flow_controller_->AddBytesConsumed(bytes);
}
}
-uint32 ReliableQuicStream::StripPriorityAndHeaderId(
- const char* data, uint32 data_len) {
- uint32 total_bytes_parsed = 0;
-
- if (!priority_parsed_ && session_->connection()->is_server()) {
- QuicPriority temporary_priority = priority_;
- total_bytes_parsed = StripUint32(
- data, data_len, &headers_id_and_priority_buffer_, &temporary_priority);
- if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.size() == 0) {
- priority_parsed_ = true;
-
- // Spdy priorities are inverted, so the highest numerical value is the
- // lowest legal priority.
- if (temporary_priority > static_cast<QuicPriority>(kLowestPriority)) {
- session_->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY);
- return 0;
- }
- priority_ = temporary_priority;
- }
- data += total_bytes_parsed;
- data_len -= total_bytes_parsed;
- }
- if (data_len > 0 && headers_id_ == 0u) {
- // The headers ID has not yet been read. Strip it from the beginning of
- // the data stream.
- total_bytes_parsed += StripUint32(
- data, data_len, &headers_id_and_priority_buffer_, &headers_id_);
- }
- return total_bytes_parsed;
+bool ReliableQuicStream::IsFlowControlBlocked() {
+ return flow_controller_.IsBlocked() ||
+ connection_flow_controller_->IsBlocked();
}
} // namespace net