Upstream version 7.36.149.0
[platform/framework/web/crosswalk.git] / src / net / quic / reliable_quic_stream.cc
index b17086b..218bb09 100644 (file)
 
 #include "net/quic/reliable_quic_stream.h"
 
+#include "base/logging.h"
+#include "net/quic/iovector.h"
+#include "net/quic/quic_flow_controller.h"
 #include "net/quic/quic_session.h"
-#include "net/quic/quic_spdy_decompressor.h"
-#include "net/spdy/write_blocked_list.h"
+#include "net/quic/quic_write_blocked_list.h"
 
 using base::StringPiece;
 using std::min;
 
 namespace net {
 
+#define ENDPOINT (is_server_ ? "Server: " : " Client: ")
+
 namespace {
 
-// This is somewhat arbitrary.  It's possible, but unlikely, we will either fail
-// to set a priority client-side, or cancel a stream before stripping the
-// priority from the wire server-side.  In either case, start out with a
-// priority in the middle.
-QuicPriority kDefaultPriority = 3;
-
-// Appends bytes from data into partial_data_buffer.  Once partial_data_buffer
-// reaches 4 bytes, copies the data into 'result' and clears
-// partial_data_buffer.
-// Returns the number of bytes consumed.
-uint32 StripUint32(const char* data, uint32 data_len,
-                   string* partial_data_buffer,
-                   uint32* result) {
-  DCHECK_GT(4u, partial_data_buffer->length());
-  size_t missing_size = 4 - partial_data_buffer->length();
-  if (data_len < missing_size) {
-    StringPiece(data, data_len).AppendToString(partial_data_buffer);
-    return data_len;
-  }
-  StringPiece(data, missing_size).AppendToString(partial_data_buffer);
-  DCHECK_EQ(4u, partial_data_buffer->length());
-  memcpy(result, partial_data_buffer->data(), 4);
-  partial_data_buffer->clear();
-  return missing_size;
+struct iovec MakeIovec(StringPiece data) {
+  struct iovec iov = {const_cast<char*>(data.data()),
+                      static_cast<size_t>(data.size())};
+  return iov;
 }
 
 }  // namespace
 
-ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
-                                       QuicSession* session)
+// Wrapper that aggregates OnAckNotifications for packets sent using
+// WriteOrBufferData and delivers them to the original
+// QuicAckNotifier::DelegateInterface after all bytes written using
+// WriteOrBufferData are acked.  This level of indirection is
+// necessary because the delegate interface provides no mechanism that
+// WriteOrBufferData can use to inform it that the write required
+// multiple WritevData calls or that only part of the data has been
+// sent out by the time ACKs start arriving.
+class ReliableQuicStream::ProxyAckNotifierDelegate
+    : public QuicAckNotifier::DelegateInterface {
+ public:
+  explicit ProxyAckNotifierDelegate(DelegateInterface* delegate)
+      : delegate_(delegate),
+        pending_acks_(0),
+        wrote_last_data_(false),
+        num_original_packets_(0),
+        num_original_bytes_(0),
+        num_retransmitted_packets_(0),
+        num_retransmitted_bytes_(0) {
+  }
+
+  virtual void OnAckNotification(int num_original_packets,
+                                 int num_original_bytes,
+                                 int num_retransmitted_packets,
+                                 int num_retransmitted_bytes,
+                                 QuicTime::Delta delta_largest_observed)
+      OVERRIDE {
+    DCHECK_LT(0, pending_acks_);
+    --pending_acks_;
+    num_original_packets_ += num_original_packets;
+    num_original_bytes_ += num_original_bytes;
+    num_retransmitted_packets_ += num_retransmitted_packets;
+    num_retransmitted_bytes_ += num_retransmitted_bytes;
+
+    if (wrote_last_data_ && pending_acks_ == 0) {
+      delegate_->OnAckNotification(num_original_packets_,
+                                   num_original_bytes_,
+                                   num_retransmitted_packets_,
+                                   num_retransmitted_bytes_,
+                                   delta_largest_observed);
+    }
+  }
+
+  void WroteData(bool last_data) {
+    DCHECK(!wrote_last_data_);
+    ++pending_acks_;
+    wrote_last_data_ = last_data;
+  }
+
+ protected:
+  // Delegates are ref counted.
+  virtual ~ProxyAckNotifierDelegate() {
+  }
+
+ private:
+  // Original delegate.  delegate_->OnAckNotification will be called when:
+  //   wrote_last_data_ == true and pending_acks_ == 0
+  scoped_refptr<DelegateInterface> delegate_;
+
+  // Number of outstanding acks.
+  int pending_acks_;
+
+  // True if no pending writes remain.
+  bool wrote_last_data_;
+
+  // Accumulators.
+  int num_original_packets_;
+  int num_original_bytes_;
+  int num_retransmitted_packets_;
+  int num_retransmitted_bytes_;
+
+  DISALLOW_COPY_AND_ASSIGN(ProxyAckNotifierDelegate);
+};
+
+ReliableQuicStream::PendingData::PendingData(
+    string data_in, scoped_refptr<ProxyAckNotifierDelegate> delegate_in)
+    : data(data_in), delegate(delegate_in) {
+}
+
+ReliableQuicStream::PendingData::~PendingData() {
+}
+
+ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session)
     : sequencer_(this),
       id_(id),
       session_(session),
-      visitor_(NULL),
       stream_bytes_read_(0),
       stream_bytes_written_(0),
-      headers_decompressed_(false),
-      priority_(kDefaultPriority),
-      headers_id_(0),
-      decompression_failed_(false),
       stream_error_(QUIC_STREAM_NO_ERROR),
       connection_error_(QUIC_NO_ERROR),
       read_side_closed_(false),
       write_side_closed_(false),
-      priority_parsed_(false),
       fin_buffered_(false),
       fin_sent_(false),
-      is_server_(session_->is_server()) {
+      rst_sent_(false),
+      is_server_(session_->is_server()),
+      flow_controller_(
+          session_->connection()->version(),
+          id_,
+          is_server_,
+          session_->config()->HasReceivedInitialFlowControlWindowBytes() ?
+              session_->config()->ReceivedInitialFlowControlWindowBytes() :
+              kDefaultFlowControlSendWindow,
+          session_->connection()->max_flow_control_receive_window_bytes(),
+          session_->connection()->max_flow_control_receive_window_bytes()),
+      connection_flow_controller_(session_->connection()->flow_controller()) {
 }
 
 ReliableQuicStream::~ReliableQuicStream() {
 }
 
-bool ReliableQuicStream::WillAcceptStreamFrame(
-    const QuicStreamFrame& frame) const {
+bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
   if (read_side_closed_) {
+    DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id;
+    // We don't want to be reading: blackhole the data.
     return true;
   }
+
   if (frame.stream_id != id_) {
     LOG(ERROR) << "Error!";
     return false;
   }
-  return sequencer_.WillAcceptStreamFrame(frame);
-}
 
-bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
-  DCHECK_EQ(frame.stream_id, id_);
-  if (read_side_closed_) {
-    DLOG(INFO) << ENDPOINT << "Ignoring frame " << frame.stream_id;
-    // We don't want to be reading: blackhole the data.
-    return true;
-  }
-  // Note: This count include duplicate data received.
-  stream_bytes_read_ += frame.data.length();
+  // This count include duplicate data received.
+  stream_bytes_read_ += frame.data.TotalBufferSize();
 
   bool accepted = sequencer_.OnStreamFrame(frame);
 
+  if (flow_controller_.FlowControlViolation() ||
+      connection_flow_controller_->FlowControlViolation()) {
+    session_->connection()->SendConnectionClose(QUIC_FLOW_CONTROL_ERROR);
+    return false;
+  }
+  MaybeSendWindowUpdate();
+
   return accepted;
 }
 
-void ReliableQuicStream::OnStreamReset(QuicRstStreamErrorCode error) {
-  stream_error_ = error;
-  TerminateFromPeer(false);  // Full close.
+void ReliableQuicStream::MaybeSendWindowUpdate() {
+  flow_controller_.MaybeSendWindowUpdate(session()->connection());
+  connection_flow_controller_->MaybeSendWindowUpdate(session()->connection());
+}
+
+int ReliableQuicStream::num_frames_received() const {
+  return sequencer_.num_frames_received();
+}
+
+int ReliableQuicStream::num_duplicate_frames_received() const {
+  return sequencer_.num_duplicate_frames_received();
+}
+
+void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
+  stream_error_ = frame.error_code;
+  CloseWriteSide();
+  CloseReadSide();
 }
 
 void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error,
@@ -110,29 +194,21 @@ void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error,
     connection_error_ = error;
   }
 
-  if (from_peer) {
-    TerminateFromPeer(false);
-  } else {
-    CloseWriteSide();
-    CloseReadSide();
-  }
+  CloseWriteSide();
+  CloseReadSide();
 }
 
-void ReliableQuicStream::TerminateFromPeer(bool half_close) {
-  if (!half_close) {
-    CloseWriteSide();
-  }
+void ReliableQuicStream::OnFinRead() {
+  DCHECK(sequencer_.IsClosed());
   CloseReadSide();
 }
 
-void ReliableQuicStream::Close(QuicRstStreamErrorCode error) {
+void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) {
+  DCHECK_NE(QUIC_STREAM_NO_ERROR, error);
   stream_error_ = error;
-  if (error != QUIC_STREAM_NO_ERROR)  {
-    // Sending a RstStream results in calling CloseStream.
-    session()->SendRstStream(id(), error);
-  } else {
-    session_->CloseStream(id());
-  }
+  // Sending a RstStream results in calling CloseStream.
+  session()->SendRstStream(id(), error, stream_bytes_written_);
+  rst_sent_ = true;
 }
 
 void ReliableQuicStream::CloseConnection(QuicErrorCode error) {
@@ -144,136 +220,148 @@ void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error,
   session()->connection()->SendConnectionCloseWithDetails(error, details);
 }
 
-size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) {
-  if (headers_decompressed_ && decompressed_headers_.empty()) {
-    return sequencer_.Readv(iov, iov_len);
-  }
-  size_t bytes_consumed = 0;
-  size_t iov_index = 0;
-  while (iov_index < iov_len &&
-         decompressed_headers_.length() > bytes_consumed) {
-    size_t bytes_to_read = min(iov[iov_index].iov_len,
-                               decompressed_headers_.length() - bytes_consumed);
-    char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
-    memcpy(iov_ptr,
-           decompressed_headers_.data() + bytes_consumed, bytes_to_read);
-    bytes_consumed += bytes_to_read;
-    ++iov_index;
-  }
-  decompressed_headers_.erase(0, bytes_consumed);
-  return bytes_consumed;
+QuicVersion ReliableQuicStream::version() const {
+  return session()->connection()->version();
 }
 
-int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) {
-  if (headers_decompressed_ && decompressed_headers_.empty()) {
-    return sequencer_.GetReadableRegions(iov, iov_len);
-  }
-  if (iov_len == 0) {
-    return 0;
+void ReliableQuicStream::WriteOrBufferData(
+    StringPiece data,
+    bool fin,
+    QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
+  if (data.empty() && !fin) {
+    LOG(DFATAL) << "data.empty() && !fin";
+    return;
   }
-  iov[0].iov_base = static_cast<void*>(
-      const_cast<char*>(decompressed_headers_.data()));
-  iov[0].iov_len = decompressed_headers_.length();
-  return 1;
-}
 
-bool ReliableQuicStream::IsHalfClosed() const {
-  if (!headers_decompressed_ || !decompressed_headers_.empty()) {
-    return false;
+  if (fin_buffered_) {
+    LOG(DFATAL) << "Fin already buffered";
+    return;
   }
-  return sequencer_.IsHalfClosed();
-}
-
-bool ReliableQuicStream::HasBytesToRead() const {
-  return !decompressed_headers_.empty() || sequencer_.HasBytesToRead();
-}
-
-const IPEndPoint& ReliableQuicStream::GetPeerAddress() const {
-  return session_->peer_address();
-}
-
-QuicSpdyCompressor* ReliableQuicStream::compressor() {
-  return session_->compressor();
-}
-
-bool ReliableQuicStream::GetSSLInfo(SSLInfo* ssl_info) {
-  return session_->GetSSLInfo(ssl_info);
-}
-
-QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) {
-  DCHECK(data.size() > 0 || fin);
-  return WriteOrBuffer(data, fin);
-}
-
 
-void ReliableQuicStream::set_priority(QuicPriority priority) {
-  DCHECK_EQ(0u, stream_bytes_written_);
-  priority_ = priority;
-}
-
-QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) {
-  DCHECK(!fin_buffered_);
+  scoped_refptr<ProxyAckNotifierDelegate> proxy_delegate;
+  if (ack_notifier_delegate != NULL) {
+    proxy_delegate = new ProxyAckNotifierDelegate(ack_notifier_delegate);
+  }
 
   QuicConsumedData consumed_data(0, false);
   fin_buffered_ = fin;
 
   if (queued_data_.empty()) {
-    consumed_data = WriteDataInternal(string(data.data(), data.length()), fin);
+    struct iovec iov(MakeIovec(data));
+    consumed_data = WritevData(&iov, 1, fin, proxy_delegate.get());
     DCHECK_LE(consumed_data.bytes_consumed, data.length());
   }
 
+  bool write_completed;
   // If there's unconsumed data or an unconsumed fin, queue it.
   if (consumed_data.bytes_consumed < data.length() ||
       (fin && !consumed_data.fin_consumed)) {
-    queued_data_.push_back(
-        string(data.data() + consumed_data.bytes_consumed,
-               data.length() - consumed_data.bytes_consumed));
+    StringPiece remainder(data.substr(consumed_data.bytes_consumed));
+    queued_data_.push_back(PendingData(remainder.as_string(), proxy_delegate));
+    write_completed = false;
+  } else {
+    write_completed = true;
   }
 
-  return QuicConsumedData(data.size(), true);
+  if ((proxy_delegate.get() != NULL) &&
+      (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed)) {
+    proxy_delegate->WroteData(write_completed);
+  }
 }
 
 void ReliableQuicStream::OnCanWrite() {
   bool fin = false;
   while (!queued_data_.empty()) {
-    const string& data = queued_data_.front();
+    PendingData* pending_data = &queued_data_.front();
+    ProxyAckNotifierDelegate* delegate = pending_data->delegate.get();
     if (queued_data_.size() == 1 && fin_buffered_) {
       fin = true;
     }
-    QuicConsumedData consumed_data = WriteDataInternal(data, fin);
-    if (consumed_data.bytes_consumed == data.size() &&
+    struct iovec iov(MakeIovec(pending_data->data));
+    QuicConsumedData consumed_data = WritevData(&iov, 1, fin, delegate);
+    if (consumed_data.bytes_consumed == pending_data->data.size() &&
         fin == consumed_data.fin_consumed) {
       queued_data_.pop_front();
+      if (delegate != NULL) {
+        delegate->WroteData(true);
+      }
     } else {
-      queued_data_.front().erase(0, consumed_data.bytes_consumed);
+      if (consumed_data.bytes_consumed > 0) {
+        pending_data->data.erase(0, consumed_data.bytes_consumed);
+        if (delegate != NULL) {
+          delegate->WroteData(false);
+        }
+      }
       break;
     }
   }
 }
 
-QuicConsumedData ReliableQuicStream::WriteDataInternal(
-    StringPiece data, bool fin) {
-  struct iovec iov = {const_cast<char*>(data.data()),
-                      static_cast<size_t>(data.size())};
-  return WritevDataInternal(&iov, 1, fin);
+void ReliableQuicStream::MaybeSendBlocked() {
+  flow_controller_.MaybeSendBlocked(session()->connection());
+  connection_flow_controller_->MaybeSendBlocked(session()->connection());
+  // If we are connection level flow control blocked, then add the stream
+  // to the write blocked list. It will be given a chance to write when a
+  // connection level WINDOW_UPDATE arrives.
+  if (connection_flow_controller_->IsBlocked() &&
+      !flow_controller_.IsBlocked()) {
+    session_->MarkWriteBlocked(id(), EffectivePriority());
+  }
 }
 
-QuicConsumedData ReliableQuicStream::WritevDataInternal(const struct iovec* iov,
-                                                        int iov_count,
-                                                        bool fin) {
+QuicConsumedData ReliableQuicStream::WritevData(
+    const struct iovec* iov,
+    int iov_count,
+    bool fin,
+    QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
   if (write_side_closed_) {
     DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed";
     return QuicConsumedData(0, false);
   }
 
-  size_t write_length = 0u;
-  for (int i = 0; i < iov_count; ++i) {
-    write_length += iov[i].iov_len;
+  // How much data we want to write.
+  size_t write_length = TotalIovecLength(iov, iov_count);
+
+  // A FIN with zero data payload should not be flow control blocked.
+  bool fin_with_zero_data = (fin && write_length == 0);
+
+  if (flow_controller_.IsEnabled()) {
+    // How much data we are allowed to write from flow control.
+    uint64 send_window = flow_controller_.SendWindowSize();
+    if (connection_flow_controller_->IsEnabled()) {
+      send_window =
+          min(send_window, connection_flow_controller_->SendWindowSize());
+    }
+
+    if (send_window == 0 && !fin_with_zero_data) {
+      // Quick return if we can't send anything.
+      MaybeSendBlocked();
+      return QuicConsumedData(0, false);
+    }
+
+    if (write_length > send_window) {
+      // Don't send the FIN if we aren't going to send all the data.
+      fin = false;
+
+      // Writing more data would be a violation of flow control.
+      write_length = send_window;
+    }
   }
-  QuicConsumedData consumed_data =
-      session()->WritevData(id(), iov, iov_count, stream_bytes_written_, fin);
+
+  // Fill an IOVector with bytes from the iovec.
+  IOVector data;
+  data.AppendIovecAtMostBytes(iov, iov_count, write_length);
+
+  QuicConsumedData consumed_data = session()->WritevData(
+      id(), data, stream_bytes_written_, fin, ack_notifier_delegate);
   stream_bytes_written_ += consumed_data.bytes_consumed;
+
+  AddBytesSent(consumed_data.bytes_consumed);
+
   if (consumed_data.bytes_consumed == write_length) {
+    if (!fin_with_zero_data) {
+      MaybeSendBlocked();
+    }
     if (fin && consumed_data.fin_consumed) {
       fin_sent_ = true;
       CloseWriteSide();
@@ -286,241 +374,99 @@ QuicConsumedData ReliableQuicStream::WritevDataInternal(const struct iovec* iov,
   return consumed_data;
 }
 
-QuicPriority ReliableQuicStream::EffectivePriority() const {
-  return priority();
-}
-
 void ReliableQuicStream::CloseReadSide() {
   if (read_side_closed_) {
     return;
   }
-  DLOG(INFO) << ENDPOINT << "Done reading from stream " << id();
+  DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
 
   read_side_closed_ = true;
   if (write_side_closed_) {
-    DLOG(INFO) << ENDPOINT << "Closing stream: " << id();
+    DVLOG(1) << ENDPOINT << "Closing stream: " << id();
     session_->CloseStream(id());
   }
 }
 
-uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) {
-  DCHECK_NE(0u, data_len);
-  if (id() == kCryptoStreamId) {
-    // The crypto stream does not use compression.
-    return ProcessData(data, data_len);
-  }
-
-  uint32 total_bytes_consumed = 0;
-  if (headers_id_ == 0u) {
-    total_bytes_consumed += StripPriorityAndHeaderId(data, data_len);
-    data += total_bytes_consumed;
-    data_len -= total_bytes_consumed;
-    if (data_len == 0 || !session_->connection()->connected()) {
-      return total_bytes_consumed;
-    }
-  }
-  DCHECK_NE(0u, headers_id_);
-
-  // Once the headers are finished, we simply pass the data through.
-  if (headers_decompressed_) {
-    // Some buffered header data remains.
-    if (!decompressed_headers_.empty()) {
-      ProcessHeaderData();
-    }
-    if (decompressed_headers_.empty()) {
-      DVLOG(1) << "Delegating procesing to ProcessData";
-      total_bytes_consumed += ProcessData(data, data_len);
-    }
-    return total_bytes_consumed;
-  }
-
-  QuicHeaderId current_header_id =
-      session_->decompressor()->current_header_id();
-  // Ensure that this header id looks sane.
-  if (headers_id_ < current_header_id ||
-      headers_id_ > kMaxHeaderIdDelta + current_header_id) {
-    DVLOG(1) << ENDPOINT
-             << "Invalid headers for stream: " << id()
-             << " header_id: " << headers_id_
-             << " current_header_id: " << current_header_id;
-    session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
-    return total_bytes_consumed;
-  }
-
-  // If we are head-of-line blocked on decompression, then back up.
-  if (current_header_id != headers_id_) {
-    session_->MarkDecompressionBlocked(headers_id_, id());
-    DVLOG(1) << ENDPOINT
-             << "Unable to decompress header data for stream: " << id()
-             << " header_id: " << headers_id_;
-    return total_bytes_consumed;
-  }
-
-  // Decompressed data will be delivered to decompressed_headers_.
-  size_t bytes_consumed = session_->decompressor()->DecompressData(
-      StringPiece(data, data_len), this);
-  DCHECK_NE(0u, bytes_consumed);
-  if (bytes_consumed > data_len) {
-    DCHECK(false) << "DecompressData returned illegal value";
-    OnDecompressionError();
-    return total_bytes_consumed;
-  }
-  total_bytes_consumed += bytes_consumed;
-  data += bytes_consumed;
-  data_len -= bytes_consumed;
-
-  if (decompression_failed_) {
-    // The session will have been closed in OnDecompressionError.
-    return total_bytes_consumed;
-  }
-
-  // Headers are complete if the decompressor has moved on to the
-  // next stream.
-  headers_decompressed_ =
-      session_->decompressor()->current_header_id() != headers_id_;
-  if (!headers_decompressed_) {
-    DCHECK_EQ(0u, data_len);
-  }
-
-  ProcessHeaderData();
-
-  if (!headers_decompressed_ || !decompressed_headers_.empty()) {
-    return total_bytes_consumed;
+void ReliableQuicStream::CloseWriteSide() {
+  if (write_side_closed_) {
+    return;
   }
+  DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
 
-  // We have processed all of the decompressed data but we might
-  // have some more raw data to process.
-  if (data_len > 0) {
-    total_bytes_consumed += ProcessData(data, data_len);
+  write_side_closed_ = true;
+  if (read_side_closed_) {
+    DVLOG(1) << ENDPOINT << "Closing stream: " << id();
+    session_->CloseStream(id());
   }
+}
 
-  // The sequencer will push any additional buffered frames if this data
-  // has been completely consumed.
-  return total_bytes_consumed;
+bool ReliableQuicStream::HasBufferedData() const {
+  return !queued_data_.empty();
 }
 
-uint32 ReliableQuicStream::ProcessHeaderData() {
-  if (decompressed_headers_.empty()) {
-    return 0;
-  }
+void ReliableQuicStream::OnClose() {
+  CloseReadSide();
+  CloseWriteSide();
 
-  size_t bytes_processed = ProcessData(decompressed_headers_.data(),
-                                       decompressed_headers_.length());
-  if (bytes_processed == decompressed_headers_.length()) {
-    decompressed_headers_.clear();
-  } else {
-    decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
+  if (!fin_sent_ && !rst_sent_) {
+    // For flow control accounting, we must tell the peer how many bytes we have
+    // written on this stream before termination. Done here if needed, using a
+    // RST frame.
+    DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id();
+    session_->SendRstStream(id(), QUIC_RST_FLOW_CONTROL_ACCOUNTING,
+                            stream_bytes_written_);
+    rst_sent_ = true;
   }
-  return bytes_processed;
 }
 
-void ReliableQuicStream::OnDecompressorAvailable() {
-  DCHECK_EQ(headers_id_,
-            session_->decompressor()->current_header_id());
-  DCHECK(!headers_decompressed_);
-  DCHECK(!decompression_failed_);
-  DCHECK_EQ(0u, decompressed_headers_.length());
-
-  while (!headers_decompressed_) {
-    struct iovec iovec;
-    if (sequencer_.GetReadableRegions(&iovec, 1) == 0) {
-      return;
-    }
-
-    size_t bytes_consumed = session_->decompressor()->DecompressData(
-        StringPiece(static_cast<char*>(iovec.iov_base),
-                    iovec.iov_len),
-        this);
-    DCHECK_LE(bytes_consumed, iovec.iov_len);
-    if (decompression_failed_) {
-      return;
-    }
-    sequencer_.MarkConsumed(bytes_consumed);
-
-    headers_decompressed_ =
-        session_->decompressor()->current_header_id() != headers_id_;
+void ReliableQuicStream::OnWindowUpdateFrame(
+    const QuicWindowUpdateFrame& frame) {
+  if (!flow_controller_.IsEnabled()) {
+    DLOG(DFATAL) << "Flow control not enabled! " << version();
+    return;
   }
 
-  // Either the headers are complete, or the all data as been consumed.
-  ProcessHeaderData();  // Unprocessed headers remain in decompressed_headers_.
-  if (IsHalfClosed()) {
-    TerminateFromPeer(true);
-  } else if (headers_decompressed_ && decompressed_headers_.empty()) {
-    sequencer_.FlushBufferedFrames();
+  if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) {
+    // We can write again!
+    // TODO(rjshade): This does not respect priorities (e.g. multiple
+    //                outstanding POSTs are unblocked on arrival of
+    //                SHLO with initial window).
+    // As long as the connection is not flow control blocked, we can write!
+    OnCanWrite();
   }
 }
 
-bool ReliableQuicStream::OnDecompressedData(StringPiece data) {
-  data.AppendToString(&decompressed_headers_);
-  return true;
-}
-
-void ReliableQuicStream::OnDecompressionError() {
-  DCHECK(!decompression_failed_);
-  decompression_failed_ = true;
-  session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE);
-}
-
-
-void ReliableQuicStream::CloseWriteSide() {
-  if (write_side_closed_) {
-    return;
+void ReliableQuicStream::AddBytesBuffered(uint64 bytes) {
+  if (flow_controller_.IsEnabled()) {
+    flow_controller_.AddBytesBuffered(bytes);
+    connection_flow_controller_->AddBytesBuffered(bytes);
   }
-  DLOG(INFO) << ENDPOINT << "Done writing to stream " << id();
+}
 
-  write_side_closed_ = true;
-  if (read_side_closed_) {
-    DLOG(INFO) << ENDPOINT << "Closing stream: " << id();
-    session_->CloseStream(id());
+void ReliableQuicStream::RemoveBytesBuffered(uint64 bytes) {
+  if (flow_controller_.IsEnabled()) {
+    flow_controller_.RemoveBytesBuffered(bytes);
+    connection_flow_controller_->RemoveBytesBuffered(bytes);
   }
 }
 
-bool ReliableQuicStream::HasBufferedData() {
-  return !queued_data_.empty();
+void ReliableQuicStream::AddBytesSent(uint64 bytes) {
+  if (flow_controller_.IsEnabled()) {
+    flow_controller_.AddBytesSent(bytes);
+    connection_flow_controller_->AddBytesSent(bytes);
+  }
 }
 
-void ReliableQuicStream::OnClose() {
-  CloseReadSide();
-  CloseWriteSide();
-
-  if (visitor_) {
-    Visitor* visitor = visitor_;
-    // Calling Visitor::OnClose() may result the destruction of the visitor,
-    // so we need to ensure we don't call it again.
-    visitor_ = NULL;
-    visitor->OnClose(this);
+void ReliableQuicStream::AddBytesConsumed(uint64 bytes) {
+  if (flow_controller_.IsEnabled()) {
+    flow_controller_.AddBytesConsumed(bytes);
+    connection_flow_controller_->AddBytesConsumed(bytes);
   }
 }
 
-uint32 ReliableQuicStream::StripPriorityAndHeaderId(
-    const char* data, uint32 data_len) {
-  uint32 total_bytes_parsed = 0;
-
-  if (!priority_parsed_ && session_->connection()->is_server()) {
-    QuicPriority temporary_priority = priority_;
-    total_bytes_parsed = StripUint32(
-        data, data_len, &headers_id_and_priority_buffer_, &temporary_priority);
-    if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.size() == 0) {
-      priority_parsed_ = true;
-
-      // Spdy priorities are inverted, so the highest numerical value is the
-      // lowest legal priority.
-      if (temporary_priority > static_cast<QuicPriority>(kLowestPriority)) {
-        session_->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY);
-        return 0;
-      }
-      priority_ = temporary_priority;
-    }
-    data += total_bytes_parsed;
-    data_len -= total_bytes_parsed;
-  }
-  if (data_len > 0 && headers_id_ == 0u) {
-    // The headers ID has not yet been read.  Strip it from the beginning of
-    // the data stream.
-    total_bytes_parsed += StripUint32(
-        data, data_len, &headers_id_and_priority_buffer_, &headers_id_);
-  }
-  return total_bytes_parsed;
+bool ReliableQuicStream::IsFlowControlBlocked() {
+  return flow_controller_.IsBlocked() ||
+         connection_flow_controller_->IsBlocked();
 }
 
 }  // namespace net