1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/bidirectional_stream_quic_impl.h"
9 #include "base/functional/bind.h"
10 #include "base/location.h"
11 #include "base/logging.h"
12 #include "base/memory/raw_ptr.h"
13 #include "base/task/single_thread_task_runner.h"
14 #include "base/timer/timer.h"
15 #include "net/http/bidirectional_stream_request_info.h"
16 #include "net/http/http_util.h"
17 #include "net/socket/next_proto.h"
18 #include "net/spdy/spdy_http_utils.h"
19 #include "net/third_party/quiche/src/quiche/quic/core/quic_connection.h"
20 #include "net/third_party/quiche/src/quiche/spdy/core/http2_header_block.h"
21 #include "quic_http_stream.h"
25 // Sets a boolean to a value, and restores it to the previous value once
26 // the saver goes out of scope.
27 class ScopedBoolSaver {
29 ScopedBoolSaver(bool* var, bool new_val) : var_(var), old_val_(*var) {
33 ~ScopedBoolSaver() { *var_ = old_val_; }
41 BidirectionalStreamQuicImpl::BidirectionalStreamQuicImpl(
42 std::unique_ptr<QuicChromiumClientSession::Handle> session)
43 : session_(std::move(session)) {}
45 BidirectionalStreamQuicImpl::~BidirectionalStreamQuicImpl() {
48 stream_->Reset(quic::QUIC_STREAM_CANCELLED);
52 void BidirectionalStreamQuicImpl::Start(
53 const BidirectionalStreamRequestInfo* request_info,
54 const NetLogWithSource& net_log,
55 bool send_request_headers_automatically,
56 BidirectionalStreamImpl::Delegate* delegate,
57 std::unique_ptr<base::OneShotTimer> timer,
58 const NetworkTrafficAnnotationTag& traffic_annotation) {
59 ScopedBoolSaver saver(&may_invoke_callbacks_, false);
62 DLOG_IF(WARNING, !session_->IsConnected())
63 << "Trying to start request headers after session has been closed.";
65 net_log.AddEventReferencingSource(
66 NetLogEventType::BIDIRECTIONAL_STREAM_BOUND_TO_QUIC_SESSION,
67 session_->net_log().source());
69 send_request_headers_automatically_ = send_request_headers_automatically;
71 request_info_ = request_info;
73 // Only allow SAFE methods to use early data, unless overridden by the caller.
74 bool use_early_data = HttpUtil::IsMethodSafe(request_info_->method);
75 use_early_data |= request_info_->allow_early_data_override;
77 int rv = session_->RequestStream(
79 base::BindOnce(&BidirectionalStreamQuicImpl::OnStreamReady,
80 weak_factory_.GetWeakPtr()),
82 if (rv == ERR_IO_PENDING)
86 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
89 &BidirectionalStreamQuicImpl::NotifyError,
90 weak_factory_.GetWeakPtr(),
91 session_->OneRttKeysAvailable() ? rv : ERR_QUIC_HANDSHAKE_FAILED));
95 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
96 FROM_HERE, base::BindOnce(&BidirectionalStreamQuicImpl::OnStreamReady,
97 weak_factory_.GetWeakPtr(), rv));
100 void BidirectionalStreamQuicImpl::SendRequestHeaders() {
101 ScopedBoolSaver saver(&may_invoke_callbacks_, false);
102 int rv = WriteHeaders();
104 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
105 FROM_HERE, base::BindOnce(&BidirectionalStreamQuicImpl::NotifyError,
106 weak_factory_.GetWeakPtr(), rv));
110 int BidirectionalStreamQuicImpl::WriteHeaders() {
111 DCHECK(!has_sent_headers_);
113 spdy::Http2HeaderBlock headers;
114 HttpRequestInfo http_request_info;
115 http_request_info.url = request_info_->url;
116 http_request_info.method = request_info_->method;
117 http_request_info.extra_headers = request_info_->extra_headers;
119 CreateSpdyHeadersFromHttpRequest(http_request_info, absl::nullopt,
120 http_request_info.extra_headers, &headers);
121 int rv = stream_->WriteHeaders(std::move(headers),
122 request_info_->end_stream_on_headers, nullptr);
124 headers_bytes_sent_ += rv;
125 has_sent_headers_ = true;
130 int BidirectionalStreamQuicImpl::ReadData(IOBuffer* buffer, int buffer_len) {
131 ScopedBoolSaver saver(&may_invoke_callbacks_, false);
135 int rv = stream_->ReadBody(
137 base::BindOnce(&BidirectionalStreamQuicImpl::OnReadDataComplete,
138 weak_factory_.GetWeakPtr()));
139 if (rv == ERR_IO_PENDING) {
140 read_buffer_ = buffer;
141 read_buffer_len_ = buffer_len;
142 return ERR_IO_PENDING;
148 // If the write side is closed, OnFinRead() will call
149 // BidirectionalStreamQuicImpl::OnClose().
150 if (stream_->IsDoneReading())
151 stream_->OnFinRead();
156 void BidirectionalStreamQuicImpl::SendvData(
157 const std::vector<scoped_refptr<IOBuffer>>& buffers,
158 const std::vector<int>& lengths,
160 ScopedBoolSaver saver(&may_invoke_callbacks_, false);
161 DCHECK_EQ(buffers.size(), lengths.size());
163 if (!stream_->IsOpen()) {
164 LOG(ERROR) << "Trying to send data after stream has been closed.";
165 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
166 FROM_HERE, base::BindOnce(&BidirectionalStreamQuicImpl::NotifyError,
167 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
171 std::unique_ptr<quic::QuicConnection::ScopedPacketFlusher> bundler(
172 session_->CreatePacketBundler());
173 if (!has_sent_headers_) {
174 DCHECK(!send_request_headers_automatically_);
175 int rv = WriteHeaders();
177 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
178 FROM_HERE, base::BindOnce(&BidirectionalStreamQuicImpl::NotifyError,
179 weak_factory_.GetWeakPtr(), rv));
184 int rv = stream_->WritevStreamData(
185 buffers, lengths, end_stream,
186 base::BindOnce(&BidirectionalStreamQuicImpl::OnSendDataComplete,
187 weak_factory_.GetWeakPtr()));
189 if (rv != ERR_IO_PENDING) {
190 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
192 base::BindOnce(&BidirectionalStreamQuicImpl::OnSendDataComplete,
193 weak_factory_.GetWeakPtr(), rv));
197 NextProto BidirectionalStreamQuicImpl::GetProtocol() const {
198 return negotiated_protocol_;
201 int64_t BidirectionalStreamQuicImpl::GetTotalReceivedBytes() const {
203 DCHECK_LE(stream_->NumBytesConsumed(), stream_->stream_bytes_read());
204 // Only count the uniquely received bytes.
205 return stream_->NumBytesConsumed();
207 return closed_stream_received_bytes_;
210 int64_t BidirectionalStreamQuicImpl::GetTotalSentBytes() const {
212 return stream_->stream_bytes_written();
214 return closed_stream_sent_bytes_;
217 bool BidirectionalStreamQuicImpl::GetLoadTimingInfo(
218 LoadTimingInfo* load_timing_info) const {
219 bool is_first_stream = closed_is_first_stream_;
221 is_first_stream = stream_->IsFirstStream();
222 if (is_first_stream) {
223 load_timing_info->socket_reused = false;
224 load_timing_info->connect_timing = connect_timing_;
226 load_timing_info->socket_reused = true;
231 void BidirectionalStreamQuicImpl::PopulateNetErrorDetails(
232 NetErrorDetails* details) {
234 details->connection_info =
235 QuicHttpStream::ConnectionInfoFromQuicVersion(session_->GetQuicVersion());
236 session_->PopulateNetErrorDetails(details);
237 if (session_->OneRttKeysAvailable() && stream_)
238 details->quic_connection_error = stream_->connection_error();
241 void BidirectionalStreamQuicImpl::OnStreamReady(int rv) {
242 DCHECK_NE(ERR_IO_PENDING, rv);
249 stream_ = session_->ReleaseStream();
252 if (!stream_->IsOpen()) {
253 NotifyError(ERR_CONNECTION_CLOSED);
257 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
259 base::BindOnce(&BidirectionalStreamQuicImpl::ReadInitialHeaders,
260 weak_factory_.GetWeakPtr()));
265 void BidirectionalStreamQuicImpl::OnSendDataComplete(int rv) {
266 CHECK(may_invoke_callbacks_);
267 DCHECK_NE(ERR_IO_PENDING, rv);
274 delegate_->OnDataSent();
277 void BidirectionalStreamQuicImpl::OnReadInitialHeadersComplete(int rv) {
278 CHECK(may_invoke_callbacks_);
279 DCHECK_NE(ERR_IO_PENDING, rv);
285 headers_bytes_received_ += rv;
286 negotiated_protocol_ = kProtoQUIC;
287 connect_timing_ = session_->GetConnectTiming();
288 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
290 base::BindOnce(&BidirectionalStreamQuicImpl::ReadTrailingHeaders,
291 weak_factory_.GetWeakPtr()));
293 delegate_->OnHeadersReceived(initial_headers_);
296 void BidirectionalStreamQuicImpl::ReadInitialHeaders() {
297 int rv = stream_->ReadInitialHeaders(
299 base::BindOnce(&BidirectionalStreamQuicImpl::OnReadInitialHeadersComplete,
300 weak_factory_.GetWeakPtr()));
302 if (rv != ERR_IO_PENDING)
303 OnReadInitialHeadersComplete(rv);
306 void BidirectionalStreamQuicImpl::ReadTrailingHeaders() {
307 int rv = stream_->ReadTrailingHeaders(
310 &BidirectionalStreamQuicImpl::OnReadTrailingHeadersComplete,
311 weak_factory_.GetWeakPtr()));
313 if (rv != ERR_IO_PENDING)
314 OnReadTrailingHeadersComplete(rv);
317 void BidirectionalStreamQuicImpl::OnReadTrailingHeadersComplete(int rv) {
318 CHECK(may_invoke_callbacks_);
319 DCHECK_NE(ERR_IO_PENDING, rv);
325 headers_bytes_received_ += rv;
328 delegate_->OnTrailersReceived(trailing_headers_);
331 void BidirectionalStreamQuicImpl::OnReadDataComplete(int rv) {
332 CHECK(may_invoke_callbacks_);
334 read_buffer_ = nullptr;
335 read_buffer_len_ = 0;
337 // If the write side is closed, OnFinRead() will call
338 // BidirectionalStreamQuicImpl::OnClose().
339 if (stream_->IsDoneReading())
340 stream_->OnFinRead();
348 delegate_->OnDataRead(rv);
351 void BidirectionalStreamQuicImpl::NotifyError(int error) {
352 NotifyErrorImpl(error, /*notify_delegate_later*/ false);
355 void BidirectionalStreamQuicImpl::NotifyErrorImpl(int error,
356 bool notify_delegate_later) {
357 DCHECK_NE(OK, error);
358 DCHECK_NE(ERR_IO_PENDING, error);
362 response_status_ = error;
363 BidirectionalStreamImpl::Delegate* delegate = delegate_;
365 // Cancel any pending callback.
366 weak_factory_.InvalidateWeakPtrs();
367 if (notify_delegate_later) {
368 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
370 base::BindOnce(&BidirectionalStreamQuicImpl::NotifyFailure,
371 weak_factory_.GetWeakPtr(), delegate, error));
373 NotifyFailure(delegate, error);
374 // |this| might be destroyed at this point.
379 void BidirectionalStreamQuicImpl::NotifyFailure(
380 BidirectionalStreamImpl::Delegate* delegate,
382 CHECK(may_invoke_callbacks_);
383 delegate->OnFailed(error);
384 // |this| might be destroyed at this point.
387 void BidirectionalStreamQuicImpl::NotifyStreamReady() {
388 CHECK(may_invoke_callbacks_);
389 if (send_request_headers_automatically_) {
390 int rv = WriteHeaders();
392 base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
393 FROM_HERE, base::BindOnce(&BidirectionalStreamQuicImpl::NotifyError,
394 weak_factory_.GetWeakPtr(), rv));
400 delegate_->OnStreamReady(has_sent_headers_);
403 void BidirectionalStreamQuicImpl::ResetStream() {
406 closed_stream_received_bytes_ = stream_->stream_bytes_read();
407 closed_stream_sent_bytes_ = stream_->stream_bytes_written();
408 closed_is_first_stream_ = stream_->IsFirstStream();