#include <list>
+#include "base/basictypes.h"
+#include "base/memory/ref_counted.h"
#include "base/strings/string_piece.h"
#include "net/base/iovec.h"
#include "net/base/net_export.h"
-#include "net/quic/quic_spdy_compressor.h"
-#include "net/quic/quic_spdy_decompressor.h"
+#include "net/quic/quic_ack_notifier.h"
+#include "net/quic/quic_flow_controller.h"
+#include "net/quic/quic_protocol.h"
#include "net/quic/quic_stream_sequencer.h"
+#include "net/quic/quic_types.h"
namespace net {
class ReliableQuicStreamPeer;
} // namespace test
-class IPEndPoint;
class QuicSession;
-class SSLInfo;
-#define ENDPOINT (is_server_ ? "Server: " : " Client: ")
-
-// All this does right now is send data to subclasses via the sequencer.
-class NET_EXPORT_PRIVATE ReliableQuicStream : public
- QuicSpdyDecompressor::Visitor {
+class NET_EXPORT_PRIVATE ReliableQuicStream {
public:
- // Visitor receives callbacks from the stream.
- class Visitor {
- public:
- Visitor() {}
-
- // Called when the stream is closed.
- virtual void OnClose(ReliableQuicStream* stream) = 0;
-
- protected:
- virtual ~Visitor() {}
-
- private:
- DISALLOW_COPY_AND_ASSIGN(Visitor);
- };
-
ReliableQuicStream(QuicStreamId id,
QuicSession* session);
virtual ~ReliableQuicStream();
- bool WillAcceptStreamFrame(const QuicStreamFrame& frame) const;
- virtual bool OnStreamFrame(const QuicStreamFrame& frame);
+ // Called when a (potentially duplicate) stream frame has been received
+ // for this stream.
+ virtual void OnStreamFrame(const QuicStreamFrame& frame);
+ // Called when the connection becomes writeable to allow the stream
+ // to write any pending data.
virtual void OnCanWrite();
// Called by the session just before the stream is deleted.
virtual void OnClose();
- // Called when we get a stream reset from the client.
- virtual void OnStreamReset(QuicRstStreamErrorCode error);
+ // Called when we get a stream reset from the peer.
+ virtual void OnStreamReset(const QuicRstStreamFrame& frame);
// Called when we get or send a connection close, and should immediately
// close the stream. This is not passed through the sequencer,
// but is handled immediately.
virtual void OnConnectionClosed(QuicErrorCode error, bool from_peer);
- // Called when we should process a stream termination or
- // stream close from the peer.
- virtual void TerminateFromPeer(bool half_close);
-
- virtual uint32 ProcessRawData(const char* data, uint32 data_len);
- virtual uint32 ProcessHeaderData();
-
- virtual uint32 ProcessData(const char* data, uint32 data_len) = 0;
+ // Called when the final data has been read.
+ virtual void OnFinRead();
- virtual bool OnDecompressedData(base::StringPiece data) OVERRIDE;
- virtual void OnDecompressionError() OVERRIDE;
+ virtual uint32 ProcessRawData(const char* data, uint32 data_len) = 0;
- // Called to close the stream from this end.
- virtual void Close(QuicRstStreamErrorCode error);
+ // Called to reset the stream from this end.
+ virtual void Reset(QuicRstStreamErrorCode error);
// Called to close the entire connection from this end.
virtual void CloseConnection(QuicErrorCode error);
virtual void CloseConnectionWithDetails(QuicErrorCode error,
const string& details);
- // This block of functions wraps the sequencer's functions of the same
- // name. These methods return uncompressed data until that has
- // been fully processed. Then they simply delegate to the sequencer.
- virtual size_t Readv(const struct iovec* iov, size_t iov_len);
- virtual int GetReadableRegions(iovec* iov, size_t iov_len);
- virtual bool IsHalfClosed() const;
- virtual bool HasBytesToRead() const;
-
- // Called by the session when a decompression blocked stream
- // becomes unblocked.
- virtual void OnDecompressorAvailable();
-
- // By default, this is the same as priority(), however it allows streams
- // to temporarily alter effective priority. For example if a SPDY stream has
- // compressed but not written headers it can write the headers with a higher
- // priority.
- virtual QuicPriority EffectivePriority() const;
+ // Returns the effective priority for the stream. This value may change
+ // during the life of the stream.
+ virtual QuicPriority EffectivePriority() const = 0;
QuicStreamId id() const { return id_; }
bool read_side_closed() const { return read_side_closed_; }
bool write_side_closed() const { return write_side_closed_; }
- uint64 stream_bytes_read() { return stream_bytes_read_; }
- uint64 stream_bytes_written() { return stream_bytes_written_; }
+ uint64 stream_bytes_read() const { return stream_bytes_read_; }
+ uint64 stream_bytes_written() const { return stream_bytes_written_; }
- const IPEndPoint& GetPeerAddress() const;
+ QuicVersion version() const;
- void set_visitor(Visitor* visitor) { visitor_ = visitor; }
+ void set_fin_sent(bool fin_sent) { fin_sent_ = fin_sent; }
+ void set_rst_sent(bool rst_sent) { rst_sent_ = rst_sent; }
- QuicSpdyCompressor* compressor();
+ void set_fec_policy(FecPolicy fec_policy) { fec_policy_ = fec_policy; }
+ FecPolicy fec_policy() const { return fec_policy_; }
- // Gets the SSL connection information.
- bool GetSSLInfo(SSLInfo* ssl_info);
+ // Adjust our flow control windows according to new offset in |frame|.
+ virtual void OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame);
- bool headers_decompressed() const { return headers_decompressed_; }
+ int num_frames_received() const;
+
+ int num_duplicate_frames_received() const;
+
+ QuicFlowController* flow_controller() { return &flow_controller_; }
+
+ // Called when we see a frame which could increase the highest offset.
+ // Returns true if the highest offset did increase.
+ bool MaybeIncreaseHighestReceivedOffset(uint64 new_offset);
+ // Called when bytese are sent to the peer.
+ void AddBytesSent(uint64 bytes);
+ // Called by the stream sequencer as bytes are consumed from the buffer.
+ // If our receive window has dropped below the threshold, then send a
+ // WINDOW_UPDATE frame.
+ void AddBytesConsumed(uint64 bytes);
+
+ // Returns true if the stream is flow control blocked, by the stream flow
+ // control window or the connection flow control window.
+ bool IsFlowControlBlocked();
+
+ // Returns true if we have received either a RST or a FIN - either of which
+ // gives a definitive number of bytes which the peer has sent. If this is not
+ // true on stream termination the session must keep track of the stream's byte
+ // offset until a definitive final value arrives.
+ bool HasFinalReceivedByteOffset() const {
+ return fin_received_ || rst_received_;
+ }
protected:
- // Returns a pair with the number of bytes consumed from data, and a boolean
- // indicating if the fin bit was consumed. This does not indicate the data
- // has been sent on the wire: it may have been turned into a packet and queued
- // if the socket was unexpectedly blocked.
- //
- // The default implementation always consumed all bytes and any fin, but
- // this behavior is not guaranteed for subclasses so callers should check the
- // return value.
- virtual QuicConsumedData WriteData(base::StringPiece data, bool fin);
+ // Sends as much of 'data' to the connection as the connection will consume,
+ // and then buffers any remaining data in queued_data_.
+ void WriteOrBufferData(
+ base::StringPiece data,
+ bool fin,
+ QuicAckNotifier::DelegateInterface* ack_notifier_delegate);
+
+ // Sends as many bytes in the first |count| buffers of |iov| to the connection
+ // as the connection will consume.
+ // If |ack_notifier_delegate| is provided, then it will be notified once all
+ // the ACKs for this write have been received.
+ // Returns the number of bytes consumed by the connection.
+ QuicConsumedData WritevData(
+ const struct iovec* iov,
+ int iov_count,
+ bool fin,
+ QuicAckNotifier::DelegateInterface* ack_notifier_delegate);
+
+ // Helper method that returns FecProtection to use for writes to the session.
+ FecProtection GetFecProtection();
// Close the read side of the socket. Further frames will not be accepted.
virtual void CloseReadSide();
// Close the write side of the socket. Further writes will fail.
void CloseWriteSide();
- bool HasBufferedData();
+ bool HasBufferedData() const;
- bool fin_buffered() { return fin_buffered_; }
+ bool fin_buffered() const { return fin_buffered_; }
+ const QuicSession* session() const { return session_; }
QuicSession* session() { return session_; }
- // Sets priority_ to priority. This should only be called before bytes are
- // written to the server.
- void set_priority(QuicPriority priority);
- // This is protected because external classes should use EffectivePriority
- // instead.
- QuicPriority priority() const { return priority_; }
-
- // Sends as much of 'data' to the connection as the connection will consume,
- // and then buffers any remaining data in queued_data_.
- // Returns (data.size(), true) as it always consumed all data: it returns for
- // convenience to have the same return type as WriteDataInternal.
- QuicConsumedData WriteOrBuffer(base::StringPiece data, bool fin);
+ const QuicStreamSequencer* sequencer() const { return &sequencer_; }
+ QuicStreamSequencer* sequencer() { return &sequencer_; }
- // Sends as much of 'data' to the connection as the connection will consume.
- // Returns the number of bytes consumed by the connection.
- QuicConsumedData WriteDataInternal(base::StringPiece data, bool fin);
+ // TODO(rjshade): Remove this method when removing QUIC_VERSION_19.
+ void DisableFlowControl() {
+ flow_controller_.Disable();
+ }
- // Sends as many bytes in the first |count| buffers of |iov| to the connection
- // as the connection will consume.
- // Returns the number of bytes consumed by the connection.
- QuicConsumedData WritevDataInternal(const struct iovec* iov,
- int iov_count,
- bool fin);
+ void DisableConnectionFlowControlForThisStream() {
+ stream_contributes_to_connection_flow_control_ = false;
+ }
private:
friend class test::ReliableQuicStreamPeer;
friend class QuicStreamUtils;
+ class ProxyAckNotifierDelegate;
+
+ struct PendingData {
+ PendingData(string data_in,
+ scoped_refptr<ProxyAckNotifierDelegate> delegate_in);
+ ~PendingData();
- uint32 StripPriorityAndHeaderId(const char* data, uint32 data_len);
+ string data;
+ // Delegate that should be notified when the pending data is acked.
+ // Can be nullptr.
+ scoped_refptr<ProxyAckNotifierDelegate> delegate;
+ };
+
+ // Calls MaybeSendBlocked on our flow controller, and connection level flow
+ // controller. If we are flow control blocked, marks this stream as write
+ // blocked.
+ void MaybeSendBlocked();
- std::list<string> queued_data_;
+ std::list<PendingData> queued_data_;
QuicStreamSequencer sequencer_;
QuicStreamId id_;
QuicSession* session_;
- // Optional visitor of this stream to be notified when the stream is closed.
- Visitor* visitor_;
// Bytes read and written refer to payload bytes only: they do not include
// framing, encryption overhead etc.
uint64 stream_bytes_read_;
uint64 stream_bytes_written_;
- // True if the headers have been completely decompresssed.
- bool headers_decompressed_;
- // The priority of the stream, once parsed.
- QuicPriority priority_;
- // ID of the header block sent by the peer, once parsed.
- QuicHeaderId headers_id_;
- // Buffer into which we write bytes from priority_ and headers_id_
- // until each is fully parsed.
- string headers_id_and_priority_buffer_;
- // Contains a copy of the decompressed headers_ until they are consumed
- // via ProcessData or Readv.
- string decompressed_headers_;
- // True if an error was encountered during decompression.
- bool decompression_failed_;
// Stream error code received from a RstStreamFrame or error code sent by the
// visitor or sequencer in the RstStreamFrame.
// True if the write side is closed, and further writes should fail.
bool write_side_closed_;
- // True if the priority has been read, false otherwise.
- bool priority_parsed_;
bool fin_buffered_;
bool fin_sent_;
+ // True if this stream has received (and the sequencer has accepted) a
+ // StreamFrame with the FIN set.
+ bool fin_received_;
+
+ // In combination with fin_sent_, used to ensure that a FIN and/or a RST is
+ // always sent before stream termination.
+ bool rst_sent_;
+
+ // True if this stream has received a RST stream frame.
+ bool rst_received_;
+
+ // FEC policy to be used for this stream.
+ FecPolicy fec_policy_;
+
// True if the session this stream is running under is a server session.
bool is_server_;
+
+ QuicFlowController flow_controller_;
+
+ // The connection level flow controller. Not owned.
+ QuicFlowController* connection_flow_controller_;
+
+ // Special streams, such as the crypto and headers streams, do not respect
+ // connection level flow control limits (but are stream level flow control
+ // limited).
+ bool stream_contributes_to_connection_flow_control_;
+
+ DISALLOW_COPY_AND_ASSIGN(ReliableQuicStream);
};
} // namespace net