- add sources.
[platform/framework/web/crosswalk.git] / src / net / spdy / spdy_stream.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/spdy_stream.h"
6
7 #include "base/bind.h"
8 #include "base/compiler_specific.h"
9 #include "base/logging.h"
10 #include "base/message_loop/message_loop.h"
11 #include "base/strings/string_number_conversions.h"
12 #include "base/strings/stringprintf.h"
13 #include "base/values.h"
14 #include "net/spdy/spdy_buffer_producer.h"
15 #include "net/spdy/spdy_http_utils.h"
16 #include "net/spdy/spdy_session.h"
17
18 namespace net {
19
20 namespace {
21
22 base::Value* NetLogSpdyStreamErrorCallback(SpdyStreamId stream_id,
23                                            int status,
24                                            const std::string* description,
25                                            NetLog::LogLevel /* log_level */) {
26   base::DictionaryValue* dict = new base::DictionaryValue();
27   dict->SetInteger("stream_id", static_cast<int>(stream_id));
28   dict->SetInteger("status", status);
29   dict->SetString("description", *description);
30   return dict;
31 }
32
33 base::Value* NetLogSpdyStreamWindowUpdateCallback(
34     SpdyStreamId stream_id,
35     int32 delta,
36     int32 window_size,
37     NetLog::LogLevel /* log_level */) {
38   base::DictionaryValue* dict = new base::DictionaryValue();
39   dict->SetInteger("stream_id", stream_id);
40   dict->SetInteger("delta", delta);
41   dict->SetInteger("window_size", window_size);
42   return dict;
43 }
44
45 bool ContainsUppercaseAscii(const std::string& str) {
46   for (std::string::const_iterator i(str.begin()); i != str.end(); ++i) {
47     if (*i >= 'A' && *i <= 'Z') {
48       return true;
49     }
50   }
51   return false;
52 }
53
54 }  // namespace
55
56 // A wrapper around a stream that calls into ProduceSynStreamFrame().
57 class SpdyStream::SynStreamBufferProducer : public SpdyBufferProducer {
58  public:
59   SynStreamBufferProducer(const base::WeakPtr<SpdyStream>& stream)
60       : stream_(stream) {
61     DCHECK(stream_.get());
62   }
63
64   virtual ~SynStreamBufferProducer() {}
65
66   virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE {
67     if (!stream_.get()) {
68       NOTREACHED();
69       return scoped_ptr<SpdyBuffer>();
70     }
71     DCHECK_GT(stream_->stream_id(), 0u);
72     return scoped_ptr<SpdyBuffer>(
73         new SpdyBuffer(stream_->ProduceSynStreamFrame()));
74   }
75
76  private:
77   const base::WeakPtr<SpdyStream> stream_;
78 };
79
80 SpdyStream::SpdyStream(SpdyStreamType type,
81                        const base::WeakPtr<SpdySession>& session,
82                        const GURL& url,
83                        RequestPriority priority,
84                        int32 initial_send_window_size,
85                        int32 initial_recv_window_size,
86                        const BoundNetLog& net_log)
87     : type_(type),
88       weak_ptr_factory_(this),
89       in_do_loop_(false),
90       continue_buffering_data_(type_ == SPDY_PUSH_STREAM),
91       stream_id_(0),
92       url_(url),
93       priority_(priority),
94       slot_(0),
95       send_stalled_by_flow_control_(false),
96       send_window_size_(initial_send_window_size),
97       recv_window_size_(initial_recv_window_size),
98       unacked_recv_window_bytes_(0),
99       session_(session),
100       delegate_(NULL),
101       send_status_(
102           (type_ == SPDY_PUSH_STREAM) ?
103           NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND),
104       request_time_(base::Time::Now()),
105       response_headers_status_(RESPONSE_HEADERS_ARE_INCOMPLETE),
106       io_state_((type_ == SPDY_PUSH_STREAM) ? STATE_IDLE : STATE_NONE),
107       response_status_(OK),
108       net_log_(net_log),
109       send_bytes_(0),
110       recv_bytes_(0),
111       just_completed_frame_type_(DATA),
112       just_completed_frame_size_(0) {
113   CHECK(type_ == SPDY_BIDIRECTIONAL_STREAM ||
114         type_ == SPDY_REQUEST_RESPONSE_STREAM ||
115         type_ == SPDY_PUSH_STREAM);
116   CHECK_GE(priority_, MINIMUM_PRIORITY);
117   CHECK_LE(priority_, MAXIMUM_PRIORITY);
118 }
119
120 SpdyStream::~SpdyStream() {
121   CHECK(!in_do_loop_);
122   UpdateHistograms();
123 }
124
125 void SpdyStream::SetDelegate(Delegate* delegate) {
126   CHECK(!delegate_);
127   CHECK(delegate);
128   delegate_ = delegate;
129
130   if (type_ == SPDY_PUSH_STREAM) {
131     DCHECK(continue_buffering_data_);
132     base::MessageLoop::current()->PostTask(
133         FROM_HERE,
134         base::Bind(&SpdyStream::PushedStreamReplayData, GetWeakPtr()));
135   }
136 }
137
138 void SpdyStream::PushedStreamReplayData() {
139   DCHECK_EQ(type_, SPDY_PUSH_STREAM);
140   DCHECK_NE(stream_id_, 0u);
141   DCHECK(continue_buffering_data_);
142
143   continue_buffering_data_ = false;
144
145   // The delegate methods called below may delete |this|, so use
146   // |weak_this| to detect that.
147   base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
148
149   CHECK(delegate_);
150   SpdyResponseHeadersStatus status =
151       delegate_->OnResponseHeadersUpdated(response_headers_);
152   if (status == RESPONSE_HEADERS_ARE_INCOMPLETE) {
153     // Since RESPONSE_HEADERS_ARE_INCOMPLETE was returned, we must not
154     // have been closed. Since we don't have complete headers, assume
155     // we're waiting for another HEADERS frame, and we had better not
156     // have any pending data frames.
157     CHECK(weak_this);
158     if (!pending_buffers_.empty()) {
159       LogStreamError(ERR_SPDY_PROTOCOL_ERROR,
160                      "Data received with incomplete headers.");
161       session_->CloseActiveStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR);
162     }
163     return;
164   }
165
166   // OnResponseHeadersUpdated() may have closed |this|.
167   if (!weak_this)
168     return;
169
170   response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE;
171
172   while (!pending_buffers_.empty()) {
173     // Take ownership of the first element of |pending_buffers_|.
174     scoped_ptr<SpdyBuffer> buffer(pending_buffers_.front());
175     pending_buffers_.weak_erase(pending_buffers_.begin());
176
177     bool eof = (buffer == NULL);
178
179     CHECK(delegate_);
180     delegate_->OnDataReceived(buffer.Pass());
181
182     // OnDataReceived() may have closed |this|.
183     if (!weak_this)
184       return;
185
186     if (eof) {
187       DCHECK(pending_buffers_.empty());
188       session_->CloseActiveStream(stream_id_, OK);
189       DCHECK(!weak_this);
190       // |pending_buffers_| is invalid at this point.
191       break;
192     }
193   }
194 }
195
196 scoped_ptr<SpdyFrame> SpdyStream::ProduceSynStreamFrame() {
197   CHECK_EQ(io_state_, STATE_SEND_REQUEST_HEADERS_COMPLETE);
198   CHECK(request_headers_);
199   CHECK_GT(stream_id_, 0u);
200
201   SpdyControlFlags flags =
202       (send_status_ == NO_MORE_DATA_TO_SEND) ?
203       CONTROL_FLAG_FIN : CONTROL_FLAG_NONE;
204   scoped_ptr<SpdyFrame> frame(session_->CreateSynStream(
205       stream_id_, priority_, slot_, flags, *request_headers_));
206   send_time_ = base::TimeTicks::Now();
207   return frame.Pass();
208 }
209
210 void SpdyStream::DetachDelegate() {
211   CHECK(!in_do_loop_);
212   DCHECK(!IsClosed());
213   delegate_ = NULL;
214   Cancel();
215 }
216
217 void SpdyStream::AdjustSendWindowSize(int32 delta_window_size) {
218   DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
219
220   if (IsClosed())
221     return;
222
223   // Check for wraparound.
224   if (send_window_size_ > 0) {
225     DCHECK_LE(delta_window_size, kint32max - send_window_size_);
226   }
227   if (send_window_size_ < 0) {
228     DCHECK_GE(delta_window_size, kint32min - send_window_size_);
229   }
230   send_window_size_ += delta_window_size;
231   PossiblyResumeIfSendStalled();
232 }
233
234 void SpdyStream::OnWriteBufferConsumed(
235     size_t frame_payload_size,
236     size_t consume_size,
237     SpdyBuffer::ConsumeSource consume_source) {
238   DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
239   if (consume_source == SpdyBuffer::DISCARD) {
240     // If we're discarding a frame or part of it, increase the send
241     // window by the number of discarded bytes. (Although if we're
242     // discarding part of a frame, it's probably because of a write
243     // error and we'll be tearing down the stream soon.)
244     size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
245     DCHECK_GT(remaining_payload_bytes, 0u);
246     IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
247   }
248   // For consumed bytes, the send window is increased when we receive
249   // a WINDOW_UPDATE frame.
250 }
251
252 void SpdyStream::IncreaseSendWindowSize(int32 delta_window_size) {
253   DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
254   DCHECK_GE(delta_window_size, 1);
255
256   // Ignore late WINDOW_UPDATEs.
257   if (IsClosed())
258     return;
259
260   if (send_window_size_ > 0) {
261     // Check for overflow.
262     int32 max_delta_window_size = kint32max - send_window_size_;
263     if (delta_window_size > max_delta_window_size) {
264       std::string desc = base::StringPrintf(
265           "Received WINDOW_UPDATE [delta: %d] for stream %d overflows "
266           "send_window_size_ [current: %d]", delta_window_size, stream_id_,
267           send_window_size_);
268       session_->ResetStream(stream_id_, RST_STREAM_FLOW_CONTROL_ERROR, desc);
269       return;
270     }
271   }
272
273   send_window_size_ += delta_window_size;
274
275   net_log_.AddEvent(
276       NetLog::TYPE_SPDY_STREAM_UPDATE_SEND_WINDOW,
277       base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
278                  stream_id_, delta_window_size, send_window_size_));
279
280   PossiblyResumeIfSendStalled();
281 }
282
283 void SpdyStream::DecreaseSendWindowSize(int32 delta_window_size) {
284   DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
285
286   if (IsClosed())
287     return;
288
289   // We only call this method when sending a frame. Therefore,
290   // |delta_window_size| should be within the valid frame size range.
291   DCHECK_GE(delta_window_size, 1);
292   DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
293
294   // |send_window_size_| should have been at least |delta_window_size| for
295   // this call to happen.
296   DCHECK_GE(send_window_size_, delta_window_size);
297
298   send_window_size_ -= delta_window_size;
299
300   net_log_.AddEvent(
301       NetLog::TYPE_SPDY_STREAM_UPDATE_SEND_WINDOW,
302       base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
303                  stream_id_, -delta_window_size, send_window_size_));
304 }
305
306 void SpdyStream::OnReadBufferConsumed(
307     size_t consume_size,
308     SpdyBuffer::ConsumeSource consume_source) {
309   DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
310   DCHECK_GE(consume_size, 1u);
311   DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
312   IncreaseRecvWindowSize(static_cast<int32>(consume_size));
313 }
314
315 void SpdyStream::IncreaseRecvWindowSize(int32 delta_window_size) {
316   DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
317
318   // By the time a read is processed by the delegate, this stream may
319   // already be inactive.
320   if (!session_->IsStreamActive(stream_id_))
321     return;
322
323   DCHECK_GE(unacked_recv_window_bytes_, 0);
324   DCHECK_GE(recv_window_size_, unacked_recv_window_bytes_);
325   DCHECK_GE(delta_window_size, 1);
326   // Check for overflow.
327   DCHECK_LE(delta_window_size, kint32max - recv_window_size_);
328
329   recv_window_size_ += delta_window_size;
330   net_log_.AddEvent(
331       NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
332       base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
333                  stream_id_, delta_window_size, recv_window_size_));
334
335   unacked_recv_window_bytes_ += delta_window_size;
336   if (unacked_recv_window_bytes_ >
337       session_->stream_initial_recv_window_size() / 2) {
338     session_->SendStreamWindowUpdate(
339         stream_id_, static_cast<uint32>(unacked_recv_window_bytes_));
340     unacked_recv_window_bytes_ = 0;
341   }
342 }
343
344 void SpdyStream::DecreaseRecvWindowSize(int32 delta_window_size) {
345   DCHECK(session_->IsStreamActive(stream_id_));
346   DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
347   DCHECK_GE(delta_window_size, 1);
348
349   // Since we never decrease the initial receive window size,
350   // |delta_window_size| should never cause |recv_window_size_| to go
351   // negative. If we do, the receive window isn't being respected.
352   if (delta_window_size > recv_window_size_) {
353     session_->ResetStream(
354         stream_id_, RST_STREAM_PROTOCOL_ERROR,
355         "delta_window_size is " + base::IntToString(delta_window_size) +
356             " in DecreaseRecvWindowSize, which is larger than the receive " +
357             "window size of " + base::IntToString(recv_window_size_));
358     return;
359   }
360
361   recv_window_size_ -= delta_window_size;
362   net_log_.AddEvent(
363       NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
364       base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
365                  stream_id_, -delta_window_size, recv_window_size_));
366 }
367
368 int SpdyStream::GetPeerAddress(IPEndPoint* address) const {
369   return session_->GetPeerAddress(address);
370 }
371
372 int SpdyStream::GetLocalAddress(IPEndPoint* address) const {
373   return session_->GetLocalAddress(address);
374 }
375
376 bool SpdyStream::WasEverUsed() const {
377   return session_->WasEverUsed();
378 }
379
380 base::Time SpdyStream::GetRequestTime() const {
381   return request_time_;
382 }
383
384 void SpdyStream::SetRequestTime(base::Time t) {
385   request_time_ = t;
386 }
387
388 int SpdyStream::OnInitialResponseHeadersReceived(
389     const SpdyHeaderBlock& initial_response_headers,
390     base::Time response_time,
391     base::TimeTicks recv_first_byte_time) {
392   // SpdySession guarantees that this is called at most once.
393   CHECK(response_headers_.empty());
394
395   // Check to make sure that we don't receive the response headers
396   // before we're ready for it.
397   switch (type_) {
398     case SPDY_BIDIRECTIONAL_STREAM:
399       // For a bidirectional stream, we're ready for the response
400       // headers once we've finished sending the request headers.
401       if (io_state_ < STATE_IDLE) {
402         session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
403                               "Response received before request sent");
404         return ERR_SPDY_PROTOCOL_ERROR;
405       }
406       break;
407
408     case SPDY_REQUEST_RESPONSE_STREAM:
409       // For a request/response stream, we're ready for the response
410       // headers once we've finished sending the request headers and
411       // the request body (if we have one).
412       if ((io_state_ < STATE_IDLE) || (send_status_ == MORE_DATA_TO_SEND) ||
413           pending_send_data_.get()) {
414         session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
415                               "Response received before request sent");
416         return ERR_SPDY_PROTOCOL_ERROR;
417       }
418       break;
419
420     case SPDY_PUSH_STREAM:
421       // For a push stream, we're ready immediately.
422       DCHECK_EQ(send_status_, NO_MORE_DATA_TO_SEND);
423       DCHECK_EQ(io_state_, STATE_IDLE);
424       break;
425   }
426
427   metrics_.StartStream();
428
429   DCHECK_EQ(io_state_, STATE_IDLE);
430
431   response_time_ = response_time;
432   recv_first_byte_time_ = recv_first_byte_time;
433   return MergeWithResponseHeaders(initial_response_headers);
434 }
435
436 int SpdyStream::OnAdditionalResponseHeadersReceived(
437     const SpdyHeaderBlock& additional_response_headers) {
438   if (type_ == SPDY_REQUEST_RESPONSE_STREAM) {
439     session_->ResetStream(
440         stream_id_, RST_STREAM_PROTOCOL_ERROR,
441         "Additional headers received for request/response stream");
442     return ERR_SPDY_PROTOCOL_ERROR;
443   } else if (type_ == SPDY_PUSH_STREAM &&
444              response_headers_status_ == RESPONSE_HEADERS_ARE_COMPLETE) {
445     session_->ResetStream(
446         stream_id_, RST_STREAM_PROTOCOL_ERROR,
447         "Additional headers received for push stream");
448     return ERR_SPDY_PROTOCOL_ERROR;
449   }
450   return MergeWithResponseHeaders(additional_response_headers);
451 }
452
453 void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
454   DCHECK(session_->IsStreamActive(stream_id_));
455
456   // If we're still buffering data for a push stream, we will do the
457   // check for data received with incomplete headers in
458   // PushedStreamReplayData().
459   if (!delegate_ || continue_buffering_data_) {
460     DCHECK_EQ(type_, SPDY_PUSH_STREAM);
461     // It should be valid for this to happen in the server push case.
462     // We'll return received data when delegate gets attached to the stream.
463     if (buffer) {
464       pending_buffers_.push_back(buffer.release());
465     } else {
466       pending_buffers_.push_back(NULL);
467       metrics_.StopStream();
468       // Note: we leave the stream open in the session until the stream
469       //       is claimed.
470     }
471     return;
472   }
473
474   // If we have response headers but the delegate has indicated that
475   // it's still incomplete, then that's a protocol error.
476   if (response_headers_status_ == RESPONSE_HEADERS_ARE_INCOMPLETE) {
477     LogStreamError(ERR_SPDY_PROTOCOL_ERROR,
478                    "Data received with incomplete headers.");
479     session_->CloseActiveStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR);
480     return;
481   }
482
483   CHECK(!IsClosed());
484
485   if (!buffer) {
486     metrics_.StopStream();
487     // Deletes |this|.
488     session_->CloseActiveStream(stream_id_, OK);
489     return;
490   }
491
492   size_t length = buffer->GetRemainingSize();
493   DCHECK_LE(length, session_->GetDataFrameMaximumPayload());
494   if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) {
495     DecreaseRecvWindowSize(static_cast<int32>(length));
496     buffer->AddConsumeCallback(
497         base::Bind(&SpdyStream::OnReadBufferConsumed, GetWeakPtr()));
498   }
499
500   // Track our bandwidth.
501   metrics_.RecordBytes(length);
502   recv_bytes_ += length;
503   recv_last_byte_time_ = base::TimeTicks::Now();
504
505   // May close |this|.
506   delegate_->OnDataReceived(buffer.Pass());
507 }
508
509 void SpdyStream::OnFrameWriteComplete(SpdyFrameType frame_type,
510                                       size_t frame_size) {
511   if (frame_size < session_->GetFrameMinimumSize() ||
512       frame_size > session_->GetFrameMaximumSize()) {
513     NOTREACHED();
514     return;
515   }
516   if (IsClosed())
517     return;
518   just_completed_frame_type_ = frame_type;
519   just_completed_frame_size_ = frame_size;
520   DoLoop(OK);
521 }
522
523 int SpdyStream::GetProtocolVersion() const {
524   return session_->GetProtocolVersion();
525 }
526
527 void SpdyStream::LogStreamError(int status, const std::string& description) {
528   net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ERROR,
529                     base::Bind(&NetLogSpdyStreamErrorCallback,
530                                stream_id_, status, &description));
531 }
532
533 void SpdyStream::OnClose(int status) {
534   CHECK(!in_do_loop_);
535   io_state_ = STATE_CLOSED;
536   response_status_ = status;
537   Delegate* delegate = delegate_;
538   delegate_ = NULL;
539   if (delegate)
540     delegate->OnClose(status);
541   // Unset |stream_id_| last so that the delegate can look it up.
542   stream_id_ = 0;
543 }
544
545 void SpdyStream::Cancel() {
546   CHECK(!in_do_loop_);
547   // We may be called again from a delegate's OnClose().
548   if (io_state_ == STATE_CLOSED)
549     return;
550
551   if (stream_id_ != 0) {
552     session_->ResetStream(stream_id_, RST_STREAM_CANCEL, std::string());
553   } else {
554     session_->CloseCreatedStream(GetWeakPtr(), RST_STREAM_CANCEL);
555   }
556   // |this| is invalid at this point.
557 }
558
559 void SpdyStream::Close() {
560   CHECK(!in_do_loop_);
561   // We may be called again from a delegate's OnClose().
562   if (io_state_ == STATE_CLOSED)
563     return;
564
565   if (stream_id_ != 0) {
566     session_->CloseActiveStream(stream_id_, OK);
567   } else {
568     session_->CloseCreatedStream(GetWeakPtr(), OK);
569   }
570   // |this| is invalid at this point.
571 }
572
573 base::WeakPtr<SpdyStream> SpdyStream::GetWeakPtr() {
574   return weak_ptr_factory_.GetWeakPtr();
575 }
576
577 int SpdyStream::SendRequestHeaders(scoped_ptr<SpdyHeaderBlock> request_headers,
578                                    SpdySendStatus send_status) {
579   CHECK_NE(type_, SPDY_PUSH_STREAM);
580   CHECK_EQ(send_status_, MORE_DATA_TO_SEND);
581   CHECK(!request_headers_);
582   CHECK(!pending_send_data_.get());
583   CHECK_EQ(io_state_, STATE_NONE);
584   request_headers_ = request_headers.Pass();
585   send_status_ = send_status;
586   io_state_ = STATE_SEND_REQUEST_HEADERS;
587   return DoLoop(OK);
588 }
589
590 void SpdyStream::SendData(IOBuffer* data,
591                           int length,
592                           SpdySendStatus send_status) {
593   CHECK_NE(type_, SPDY_PUSH_STREAM);
594   CHECK_EQ(send_status_, MORE_DATA_TO_SEND);
595   CHECK_GE(io_state_, STATE_SEND_REQUEST_HEADERS_COMPLETE);
596   CHECK(!pending_send_data_.get());
597   pending_send_data_ = new DrainableIOBuffer(data, length);
598   send_status_ = send_status;
599   QueueNextDataFrame();
600 }
601
602 bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info,
603                             bool* was_npn_negotiated,
604                             NextProto* protocol_negotiated) {
605   return session_->GetSSLInfo(
606       ssl_info, was_npn_negotiated, protocol_negotiated);
607 }
608
609 bool SpdyStream::GetSSLCertRequestInfo(SSLCertRequestInfo* cert_request_info) {
610   return session_->GetSSLCertRequestInfo(cert_request_info);
611 }
612
613 void SpdyStream::PossiblyResumeIfSendStalled() {
614   DCHECK(!IsClosed());
615
616   if (send_stalled_by_flow_control_ && !session_->IsSendStalled() &&
617       send_window_size_ > 0) {
618     net_log_.AddEvent(
619         NetLog::TYPE_SPDY_STREAM_FLOW_CONTROL_UNSTALLED,
620         NetLog::IntegerCallback("stream_id", stream_id_));
621     send_stalled_by_flow_control_ = false;
622     QueueNextDataFrame();
623   }
624 }
625
626 bool SpdyStream::IsClosed() const {
627   return io_state_ == STATE_CLOSED;
628 }
629
630 bool SpdyStream::IsIdle() const {
631   return io_state_ == STATE_IDLE;
632 }
633
634 NextProto SpdyStream::GetProtocol() const {
635   return session_->protocol();
636 }
637
638 bool SpdyStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const {
639   if (stream_id_ == 0)
640     return false;
641
642   return session_->GetLoadTimingInfo(stream_id_, load_timing_info);
643 }
644
645 GURL SpdyStream::GetUrlFromHeaders() const {
646   if (type_ != SPDY_PUSH_STREAM && !request_headers_)
647     return GURL();
648
649   const SpdyHeaderBlock& headers =
650       (type_ == SPDY_PUSH_STREAM) ? response_headers_ : *request_headers_;
651   return GetUrlFromHeaderBlock(headers, GetProtocolVersion(),
652                                type_ == SPDY_PUSH_STREAM);
653 }
654
655 bool SpdyStream::HasUrlFromHeaders() const {
656   return !GetUrlFromHeaders().is_empty();
657 }
658
659 int SpdyStream::DoLoop(int result) {
660   CHECK(!in_do_loop_);
661   in_do_loop_ = true;
662
663   do {
664     State state = io_state_;
665     io_state_ = STATE_NONE;
666     switch (state) {
667       case STATE_SEND_REQUEST_HEADERS:
668         CHECK_EQ(result, OK);
669         result = DoSendRequestHeaders();
670         break;
671       case STATE_SEND_REQUEST_HEADERS_COMPLETE:
672         CHECK_EQ(result, OK);
673         result = DoSendRequestHeadersComplete();
674         break;
675
676       // For request/response streams, no data is sent from the client
677       // while in the OPEN state, so OnFrameWriteComplete is never
678       // called here.  The HTTP body is handled in the OnDataReceived
679       // callback, which does not call into DoLoop.
680       //
681       // For bidirectional streams, we'll send and receive data once
682       // the connection is established.  Received data is handled in
683       // OnDataReceived.  Sent data is handled in
684       // OnFrameWriteComplete, which calls DoOpen().
685       case STATE_IDLE:
686         CHECK_EQ(result, OK);
687         result = DoOpen();
688         break;
689
690       case STATE_CLOSED:
691         DCHECK_NE(result, ERR_IO_PENDING);
692         break;
693       default:
694         NOTREACHED() << io_state_;
695         break;
696     }
697   } while (result != ERR_IO_PENDING && io_state_ != STATE_NONE &&
698            io_state_ != STATE_IDLE);
699
700   CHECK(in_do_loop_);
701   in_do_loop_ = false;
702
703   return result;
704 }
705
706 int SpdyStream::DoSendRequestHeaders() {
707   DCHECK_NE(type_, SPDY_PUSH_STREAM);
708   io_state_ = STATE_SEND_REQUEST_HEADERS_COMPLETE;
709
710   session_->EnqueueStreamWrite(
711       GetWeakPtr(), SYN_STREAM,
712       scoped_ptr<SpdyBufferProducer>(
713           new SynStreamBufferProducer(GetWeakPtr())));
714   return ERR_IO_PENDING;
715 }
716
717 namespace {
718
719 // Assuming we're in STATE_IDLE, maps the given type (which must not
720 // be SPDY_PUSH_STREAM) and send status to a result to return from
721 // DoSendRequestHeadersComplete() or DoOpen().
722 int GetOpenStateResult(SpdyStreamType type, SpdySendStatus send_status) {
723   switch (type) {
724     case SPDY_BIDIRECTIONAL_STREAM:
725       // For bidirectional streams, there's nothing else to do.
726       DCHECK_EQ(send_status, MORE_DATA_TO_SEND);
727       return OK;
728
729     case SPDY_REQUEST_RESPONSE_STREAM:
730       // For request/response streams, wait for the delegate to send
731       // data if there's request data to send; we'll get called back
732       // when the send finishes.
733       if (send_status == MORE_DATA_TO_SEND)
734         return ERR_IO_PENDING;
735
736       return OK;
737
738     case SPDY_PUSH_STREAM:
739       // This should never be called for push streams.
740       break;
741   }
742
743   CHECK(false);
744   return ERR_UNEXPECTED;
745 }
746
747 }  // namespace
748
749 int SpdyStream::DoSendRequestHeadersComplete() {
750   DCHECK_NE(type_, SPDY_PUSH_STREAM);
751   DCHECK_EQ(just_completed_frame_type_, SYN_STREAM);
752   DCHECK_NE(stream_id_, 0u);
753
754   io_state_ = STATE_IDLE;
755
756   CHECK(delegate_);
757   // Must not close |this|; if it does, it will trigger the |in_do_loop_|
758   // check in the destructor.
759   delegate_->OnRequestHeadersSent();
760
761   return GetOpenStateResult(type_, send_status_);
762 }
763
764 int SpdyStream::DoOpen() {
765   DCHECK_NE(type_, SPDY_PUSH_STREAM);
766
767   if (just_completed_frame_type_ != DATA) {
768     NOTREACHED();
769     return ERR_UNEXPECTED;
770   }
771
772   if (just_completed_frame_size_ < session_->GetDataFrameMinimumSize()) {
773     NOTREACHED();
774     return ERR_UNEXPECTED;
775   }
776
777   size_t frame_payload_size =
778       just_completed_frame_size_ - session_->GetDataFrameMinimumSize();
779   if (frame_payload_size > session_->GetDataFrameMaximumPayload()) {
780     NOTREACHED();
781     return ERR_UNEXPECTED;
782   }
783
784   // Set |io_state_| first as |delegate_| may check it.
785   io_state_ = STATE_IDLE;
786
787   send_bytes_ += frame_payload_size;
788
789   pending_send_data_->DidConsume(frame_payload_size);
790   if (pending_send_data_->BytesRemaining() > 0) {
791     QueueNextDataFrame();
792     return ERR_IO_PENDING;
793   }
794
795   pending_send_data_ = NULL;
796
797   CHECK(delegate_);
798   // Must not close |this|; if it does, it will trigger the
799   // |in_do_loop_| check in the destructor.
800   delegate_->OnDataSent();
801
802   return GetOpenStateResult(type_, send_status_);
803 }
804
805 void SpdyStream::UpdateHistograms() {
806   // We need at least the receive timers to be filled in, as otherwise
807   // metrics can be bogus.
808   if (recv_first_byte_time_.is_null() || recv_last_byte_time_.is_null())
809     return;
810
811   base::TimeTicks effective_send_time;
812   if (type_ == SPDY_PUSH_STREAM) {
813     // Push streams shouldn't have |send_time_| filled in.
814     DCHECK(send_time_.is_null());
815     effective_send_time = recv_first_byte_time_;
816   } else {
817     // For non-push streams, we also need |send_time_| to be filled
818     // in.
819     if (send_time_.is_null())
820       return;
821     effective_send_time = send_time_;
822   }
823
824   UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte",
825                       recv_first_byte_time_ - effective_send_time);
826   UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime",
827                       recv_last_byte_time_ - recv_first_byte_time_);
828   UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime",
829                       recv_last_byte_time_ - effective_send_time);
830
831   UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_);
832   UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_);
833 }
834
835 void SpdyStream::QueueNextDataFrame() {
836   // Until the request has been completely sent, we cannot be sure
837   // that our stream_id is correct.
838   DCHECK_GT(io_state_, STATE_SEND_REQUEST_HEADERS_COMPLETE);
839   CHECK_GT(stream_id_, 0u);
840   CHECK(pending_send_data_.get());
841   CHECK_GT(pending_send_data_->BytesRemaining(), 0);
842
843   SpdyDataFlags flags =
844       (send_status_ == NO_MORE_DATA_TO_SEND) ?
845       DATA_FLAG_FIN : DATA_FLAG_NONE;
846   scoped_ptr<SpdyBuffer> data_buffer(
847       session_->CreateDataBuffer(stream_id_,
848                                  pending_send_data_.get(),
849                                  pending_send_data_->BytesRemaining(),
850                                  flags));
851   // We'll get called again by PossiblyResumeIfSendStalled().
852   if (!data_buffer)
853     return;
854
855   if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) {
856     DCHECK_GE(data_buffer->GetRemainingSize(),
857               session_->GetDataFrameMinimumSize());
858     size_t payload_size =
859         data_buffer->GetRemainingSize() - session_->GetDataFrameMinimumSize();
860     DCHECK_LE(payload_size, session_->GetDataFrameMaximumPayload());
861     DecreaseSendWindowSize(static_cast<int32>(payload_size));
862     // This currently isn't strictly needed, since write frames are
863     // discarded only if the stream is about to be closed. But have it
864     // here anyway just in case this changes.
865     data_buffer->AddConsumeCallback(
866         base::Bind(&SpdyStream::OnWriteBufferConsumed,
867                    GetWeakPtr(), payload_size));
868   }
869
870   session_->EnqueueStreamWrite(
871       GetWeakPtr(), DATA,
872       scoped_ptr<SpdyBufferProducer>(
873           new SimpleBufferProducer(data_buffer.Pass())));
874 }
875
876 int SpdyStream::MergeWithResponseHeaders(
877     const SpdyHeaderBlock& new_response_headers) {
878   if (new_response_headers.find("transfer-encoding") !=
879       new_response_headers.end()) {
880     session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
881                          "Received transfer-encoding header");
882     return ERR_SPDY_PROTOCOL_ERROR;
883   }
884
885   for (SpdyHeaderBlock::const_iterator it = new_response_headers.begin();
886       it != new_response_headers.end(); ++it) {
887     // Disallow uppercase headers.
888     if (ContainsUppercaseAscii(it->first)) {
889       session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
890                             "Upper case characters in header: " + it->first);
891       return ERR_SPDY_PROTOCOL_ERROR;
892     }
893
894     SpdyHeaderBlock::iterator it2 = response_headers_.lower_bound(it->first);
895     // Disallow duplicate headers.  This is just to be conservative.
896     if (it2 != response_headers_.end() && it2->first == it->first) {
897       session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
898                             "Duplicate header: " + it->first);
899       return ERR_SPDY_PROTOCOL_ERROR;
900     }
901
902     response_headers_.insert(it2, *it);
903   }
904
905   // If delegate_ is not yet attached, we'll call
906   // OnResponseHeadersUpdated() after the delegate gets attached to
907   // the stream.
908   if (delegate_) {
909     // The call to OnResponseHeadersUpdated() below may delete |this|,
910     // so use |weak_this| to detect that.
911     base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
912
913     SpdyResponseHeadersStatus status =
914         delegate_->OnResponseHeadersUpdated(response_headers_);
915     if (status == RESPONSE_HEADERS_ARE_INCOMPLETE) {
916       // Since RESPONSE_HEADERS_ARE_INCOMPLETE was returned, we must not
917       // have been closed.
918       CHECK(weak_this);
919       // Incomplete headers are OK only for push streams.
920       if (type_ != SPDY_PUSH_STREAM) {
921         session_->ResetStream(stream_id_, RST_STREAM_PROTOCOL_ERROR,
922                               "Incomplete headers");
923         return ERR_INCOMPLETE_SPDY_HEADERS;
924       }
925     } else if (weak_this) {
926       response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE;
927     }
928   }
929
930   return OK;
931 }
932
933 }  // namespace net