1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/websockets/websocket_job.h"
10 #include "base/lazy_instance.h"
11 #include "net/base/io_buffer.h"
12 #include "net/base/net_errors.h"
13 #include "net/base/net_log.h"
14 #include "net/cookies/cookie_store.h"
15 #include "net/http/http_network_session.h"
16 #include "net/http/http_transaction_factory.h"
17 #include "net/http/http_util.h"
18 #include "net/spdy/spdy_session.h"
19 #include "net/spdy/spdy_session_pool.h"
20 #include "net/url_request/url_request_context.h"
21 #include "net/websockets/websocket_handshake_handler.h"
22 #include "net/websockets/websocket_net_log_params.h"
23 #include "net/websockets/websocket_throttle.h"
26 static const int kMaxPendingSendAllowed = 32768; // 32 kilobytes.
30 // lower-case header names.
31 const char* const kCookieHeaders[] = {
34 const char* const kSetCookieHeaders[] = {
35 "set-cookie", "set-cookie2"
38 net::SocketStreamJob* WebSocketJobFactory(
39 const GURL& url, net::SocketStream::Delegate* delegate) {
40 net::WebSocketJob* job = new net::WebSocketJob(delegate);
41 job->InitSocketStream(new net::SocketStream(url, job));
45 class WebSocketJobInitSingleton {
47 friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>;
48 WebSocketJobInitSingleton() {
49 net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory);
50 net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory);
54 static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init =
55 LAZY_INSTANCE_INITIALIZER;
57 } // anonymous namespace
61 bool WebSocketJob::websocket_over_spdy_enabled_ = false;
64 void WebSocketJob::EnsureInit() {
65 g_websocket_job_init.Get();
69 void WebSocketJob::set_websocket_over_spdy_enabled(bool enabled) {
70 websocket_over_spdy_enabled_ = enabled;
73 WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate)
74 : delegate_(delegate),
77 handshake_request_(new WebSocketHandshakeRequestHandler),
78 handshake_response_(new WebSocketHandshakeResponseHandler),
79 started_to_send_handshake_request_(false),
80 handshake_request_sent_(0),
81 response_cookies_save_index_(0),
82 spdy_protocol_version_(0),
83 save_next_cookie_running_(false),
84 callback_pending_(false),
85 weak_ptr_factory_(this),
86 weak_ptr_factory_for_send_pending_(this) {
89 WebSocketJob::~WebSocketJob() {
90 DCHECK_EQ(CLOSED, state_);
92 DCHECK(!socket_.get());
95 void WebSocketJob::Connect() {
96 DCHECK(socket_.get());
97 DCHECK_EQ(state_, INITIALIZED);
102 bool WebSocketJob::SendData(const char* data, int len) {
108 return SendHandshakeRequest(data, len);
112 scoped_refptr<IOBufferWithSize> buffer = new IOBufferWithSize(len);
113 memcpy(buffer->data(), data, len);
114 if (current_send_buffer_.get() || !send_buffer_queue_.empty()) {
115 send_buffer_queue_.push_back(buffer);
118 current_send_buffer_ = new DrainableIOBuffer(buffer.get(), len);
119 return SendDataInternal(current_send_buffer_->data(),
120 current_send_buffer_->BytesRemaining());
130 void WebSocketJob::Close() {
131 if (state_ == CLOSED)
135 if (current_send_buffer_.get()) {
136 // Will close in SendPending.
143 void WebSocketJob::RestartWithAuth(const AuthCredentials& credentials) {
145 socket_->RestartWithAuth(credentials);
148 void WebSocketJob::DetachDelegate() {
150 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
152 scoped_refptr<WebSocketJob> protect(this);
153 weak_ptr_factory_.InvalidateWeakPtrs();
154 weak_ptr_factory_for_send_pending_.InvalidateWeakPtrs();
158 socket_->DetachDelegate();
160 if (!callback_.is_null()) {
163 Release(); // Balanced with OnStartOpenConnection().
167 int WebSocketJob::OnStartOpenConnection(
168 SocketStream* socket, const CompletionCallback& callback) {
169 DCHECK(callback_.is_null());
172 addresses_ = socket->address_list();
173 if (!WebSocketThrottle::GetInstance()->PutInQueue(this)) {
174 return ERR_WS_THROTTLE_QUEUE_TOO_LARGE;
178 int result = delegate_->OnStartOpenConnection(socket, callback);
179 DCHECK_EQ(OK, result);
182 // PutInQueue() may set |waiting_| true for throttling. In this case,
183 // Wakeup() will be called later.
184 callback_ = callback;
185 AddRef(); // Balanced when callback_ is cleared.
186 return ERR_IO_PENDING;
188 return TrySpdyStream();
191 void WebSocketJob::OnConnected(
192 SocketStream* socket, int max_pending_send_allowed) {
193 if (state_ == CLOSED)
195 DCHECK_EQ(CONNECTING, state_);
197 delegate_->OnConnected(socket, max_pending_send_allowed);
200 void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) {
201 DCHECK_NE(INITIALIZED, state_);
202 DCHECK_GT(amount_sent, 0);
203 if (state_ == CLOSED)
205 if (state_ == CONNECTING) {
206 OnSentHandshakeRequest(socket, amount_sent);
210 DCHECK(state_ == OPEN || state_ == CLOSING);
211 if (!current_send_buffer_.get()) {
213 << "OnSentData current_send_buffer=NULL amount_sent=" << amount_sent;
216 current_send_buffer_->DidConsume(amount_sent);
217 if (current_send_buffer_->BytesRemaining() > 0)
220 // We need to report amount_sent of original buffer size, instead of
221 // amount sent to |socket|.
222 amount_sent = current_send_buffer_->size();
223 DCHECK_GT(amount_sent, 0);
224 current_send_buffer_ = NULL;
225 if (!weak_ptr_factory_for_send_pending_.HasWeakPtrs()) {
226 base::MessageLoopForIO::current()->PostTask(
228 base::Bind(&WebSocketJob::SendPending,
229 weak_ptr_factory_for_send_pending_.GetWeakPtr()));
231 delegate_->OnSentData(socket, amount_sent);
235 void WebSocketJob::OnReceivedData(
236 SocketStream* socket, const char* data, int len) {
237 DCHECK_NE(INITIALIZED, state_);
238 if (state_ == CLOSED)
240 if (state_ == CONNECTING) {
241 OnReceivedHandshakeResponse(socket, data, len);
244 DCHECK(state_ == OPEN || state_ == CLOSING);
245 if (delegate_ && len > 0)
246 delegate_->OnReceivedData(socket, data, len);
249 void WebSocketJob::OnClose(SocketStream* socket) {
251 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
253 scoped_refptr<WebSocketJob> protect(this);
254 weak_ptr_factory_.InvalidateWeakPtrs();
256 SocketStream::Delegate* delegate = delegate_;
259 if (!callback_.is_null()) {
262 Release(); // Balanced with OnStartOpenConnection().
265 delegate->OnClose(socket);
268 void WebSocketJob::OnAuthRequired(
269 SocketStream* socket, AuthChallengeInfo* auth_info) {
271 delegate_->OnAuthRequired(socket, auth_info);
274 void WebSocketJob::OnSSLCertificateError(
275 SocketStream* socket, const SSLInfo& ssl_info, bool fatal) {
277 delegate_->OnSSLCertificateError(socket, ssl_info, fatal);
280 void WebSocketJob::OnError(const SocketStream* socket, int error) {
281 if (delegate_ && error != ERR_PROTOCOL_SWITCHED)
282 delegate_->OnError(socket, error);
285 void WebSocketJob::OnCreatedSpdyStream(int result) {
286 DCHECK(spdy_websocket_stream_.get());
287 DCHECK(socket_.get());
288 DCHECK_NE(ERR_IO_PENDING, result);
290 if (state_ == CLOSED) {
291 result = ERR_ABORTED;
292 } else if (result == OK) {
294 result = ERR_PROTOCOL_SWITCHED;
296 spdy_websocket_stream_.reset();
302 void WebSocketJob::OnSentSpdyHeaders() {
303 DCHECK_NE(INITIALIZED, state_);
304 if (state_ != CONNECTING)
307 delegate_->OnSentData(socket_.get(), handshake_request_->original_length());
308 handshake_request_.reset();
311 void WebSocketJob::OnSpdyResponseHeadersUpdated(
312 const SpdyHeaderBlock& response_headers) {
313 DCHECK_NE(INITIALIZED, state_);
314 if (state_ != CONNECTING)
316 // TODO(toyoshim): Fallback to non-spdy connection?
317 handshake_response_->ParseResponseHeaderBlock(response_headers,
319 spdy_protocol_version_);
321 SaveCookiesAndNotifyHeadersComplete();
324 void WebSocketJob::OnSentSpdyData(size_t bytes_sent) {
325 DCHECK_NE(INITIALIZED, state_);
326 DCHECK_NE(CONNECTING, state_);
327 if (state_ == CLOSED)
329 if (!spdy_websocket_stream_.get())
331 OnSentData(socket_.get(), static_cast<int>(bytes_sent));
334 void WebSocketJob::OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) {
335 DCHECK_NE(INITIALIZED, state_);
336 DCHECK_NE(CONNECTING, state_);
337 if (state_ == CLOSED)
339 if (!spdy_websocket_stream_.get())
343 socket_.get(), buffer->GetRemainingData(), buffer->GetRemainingSize());
345 OnReceivedData(socket_.get(), NULL, 0);
349 void WebSocketJob::OnCloseSpdyStream() {
350 spdy_websocket_stream_.reset();
351 OnClose(socket_.get());
354 bool WebSocketJob::SendHandshakeRequest(const char* data, int len) {
355 DCHECK_EQ(state_, CONNECTING);
356 if (started_to_send_handshake_request_)
358 if (!handshake_request_->ParseRequest(data, len))
361 AddCookieHeaderAndSend();
365 void WebSocketJob::AddCookieHeaderAndSend() {
367 if (delegate_ && !delegate_->CanGetCookies(socket_.get(), GetURLForCookies()))
370 if (socket_.get() && delegate_ && state_ == CONNECTING) {
371 handshake_request_->RemoveHeaders(kCookieHeaders,
372 arraysize(kCookieHeaders));
373 if (allow && socket_->context()->cookie_store()) {
374 // Add cookies, including HttpOnly cookies.
375 CookieOptions cookie_options;
376 cookie_options.set_include_httponly();
377 socket_->context()->cookie_store()->GetCookiesWithOptionsAsync(
378 GetURLForCookies(), cookie_options,
379 base::Bind(&WebSocketJob::LoadCookieCallback,
380 weak_ptr_factory_.GetWeakPtr()));
387 void WebSocketJob::LoadCookieCallback(const std::string& cookie) {
389 // TODO(tyoshino): Sending cookie means that connection doesn't need
390 // kPrivacyModeEnabled as cookies may be server-bound and channel id
391 // wouldn't negatively affect privacy anyway. Need to restart connection
392 // or refactor to determine cookie status prior to connecting.
393 handshake_request_->AppendHeaderIfMissing("Cookie", cookie);
397 void WebSocketJob::DoSendData() {
398 if (spdy_websocket_stream_.get()) {
399 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
400 handshake_request_->GetRequestHeaderBlock(
401 socket_->url(), headers.get(), &challenge_, spdy_protocol_version_);
402 spdy_websocket_stream_->SendRequest(headers.Pass());
404 const std::string& handshake_request =
405 handshake_request_->GetRawRequest();
406 handshake_request_sent_ = 0;
407 socket_->net_log()->AddEvent(
408 NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS,
409 base::Bind(&NetLogWebSocketHandshakeCallback, &handshake_request));
410 socket_->SendData(handshake_request.data(),
411 handshake_request.size());
413 // Just buffered in |handshake_request_|.
414 started_to_send_handshake_request_ = true;
417 void WebSocketJob::OnSentHandshakeRequest(
418 SocketStream* socket, int amount_sent) {
419 DCHECK_EQ(state_, CONNECTING);
420 handshake_request_sent_ += amount_sent;
421 DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length());
422 if (handshake_request_sent_ >= handshake_request_->raw_length()) {
423 // handshake request has been sent.
424 // notify original size of handshake request to delegate.
426 delegate_->OnSentData(
428 handshake_request_->original_length());
429 handshake_request_.reset();
433 void WebSocketJob::OnReceivedHandshakeResponse(
434 SocketStream* socket, const char* data, int len) {
435 DCHECK_EQ(state_, CONNECTING);
436 if (handshake_response_->HasResponse()) {
437 // If we already has handshake response, received data should be frame
438 // data, not handshake message.
439 received_data_after_handshake_.insert(
440 received_data_after_handshake_.end(), data, data + len);
444 size_t response_length = handshake_response_->ParseRawResponse(data, len);
445 if (!handshake_response_->HasResponse()) {
446 // not yet. we need more data.
449 // handshake message is completed.
450 std::string raw_response = handshake_response_->GetRawResponse();
451 socket_->net_log()->AddEvent(
452 NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS,
453 base::Bind(&NetLogWebSocketHandshakeCallback, &raw_response));
454 if (len - response_length > 0) {
455 // If we received extra data, it should be frame data.
456 DCHECK(received_data_after_handshake_.empty());
457 received_data_after_handshake_.assign(data + response_length, data + len);
459 SaveCookiesAndNotifyHeadersComplete();
462 void WebSocketJob::SaveCookiesAndNotifyHeadersComplete() {
463 // handshake message is completed.
464 DCHECK(handshake_response_->HasResponse());
466 // Extract cookies from the handshake response into a temporary vector.
467 response_cookies_.clear();
468 response_cookies_save_index_ = 0;
470 handshake_response_->GetHeaders(
471 kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_);
473 // Now, loop over the response cookies, and attempt to persist each.
477 void WebSocketJob::NotifyHeadersComplete() {
478 // Remove cookie headers, with malformed headers preserved.
479 // Actual handshake should be done in Blink.
480 handshake_response_->RemoveHeaders(
481 kSetCookieHeaders, arraysize(kSetCookieHeaders));
482 std::string handshake_response = handshake_response_->GetResponse();
483 handshake_response_.reset();
484 std::vector<char> received_data(handshake_response.begin(),
485 handshake_response.end());
486 received_data.insert(received_data.end(),
487 received_data_after_handshake_.begin(),
488 received_data_after_handshake_.end());
489 received_data_after_handshake_.clear();
493 DCHECK(!received_data.empty());
495 delegate_->OnReceivedData(
496 socket_.get(), &received_data.front(), received_data.size());
498 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
501 void WebSocketJob::SaveNextCookie() {
502 if (!socket_.get() || !delegate_ || state_ != CONNECTING)
505 callback_pending_ = false;
506 save_next_cookie_running_ = true;
508 if (socket_->context()->cookie_store()) {
509 GURL url_for_cookies = GetURLForCookies();
511 CookieOptions options;
512 options.set_include_httponly();
514 // Loop as long as SetCookieWithOptionsAsync completes synchronously. Since
515 // CookieMonster's asynchronous operation APIs queue the callback to run it
516 // on the thread where the API was called, there won't be race. I.e. unless
517 // the callback is run synchronously, it won't be run in parallel with this
519 while (!callback_pending_ &&
520 response_cookies_save_index_ < response_cookies_.size()) {
521 std::string cookie = response_cookies_[response_cookies_save_index_];
522 response_cookies_save_index_++;
524 if (!delegate_->CanSetCookie(
525 socket_.get(), url_for_cookies, cookie, &options))
528 callback_pending_ = true;
529 socket_->context()->cookie_store()->SetCookieWithOptionsAsync(
530 url_for_cookies, cookie, options,
531 base::Bind(&WebSocketJob::OnCookieSaved,
532 weak_ptr_factory_.GetWeakPtr()));
536 save_next_cookie_running_ = false;
538 if (callback_pending_)
541 response_cookies_.clear();
542 response_cookies_save_index_ = 0;
544 NotifyHeadersComplete();
547 void WebSocketJob::OnCookieSaved(bool cookie_status) {
548 // Tell the caller of SetCookieWithOptionsAsync() that this completion
549 // callback is invoked.
550 // - If the caller checks callback_pending earlier than this callback, the
551 // caller exits to let this method continue iteration.
552 // - Otherwise, the caller continues iteration.
553 callback_pending_ = false;
555 // Resume SaveNextCookie if the caller of SetCookieWithOptionsAsync() exited
556 // the loop. Otherwise, return.
557 if (save_next_cookie_running_)
563 GURL WebSocketJob::GetURLForCookies() const {
564 GURL url = socket_->url();
565 std::string scheme = socket_->is_secure() ? "https" : "http";
566 url_canon::Replacements<char> replacements;
567 replacements.SetScheme(scheme.c_str(),
568 url_parse::Component(0, scheme.length()));
569 return url.ReplaceComponents(replacements);
572 const AddressList& WebSocketJob::address_list() const {
576 int WebSocketJob::TrySpdyStream() {
580 if (!websocket_over_spdy_enabled_)
583 // Check if we have a SPDY session available.
584 HttpTransactionFactory* factory =
585 socket_->context()->http_transaction_factory();
588 scoped_refptr<HttpNetworkSession> session = factory->GetSession();
591 SpdySessionPool* spdy_pool = session->spdy_session_pool();
592 PrivacyMode privacy_mode = socket_->privacy_mode();
593 const SpdySessionKey key(HostPortPair::FromURL(socket_->url()),
594 socket_->proxy_server(), privacy_mode);
595 // Forbid wss downgrade to SPDY without SSL.
596 // TODO(toyoshim): Does it realize the same policy with HTTP?
597 base::WeakPtr<SpdySession> spdy_session =
598 spdy_pool->FindAvailableSession(key, *socket_->net_log());
603 bool was_npn_negotiated;
604 NextProto protocol_negotiated = kProtoUnknown;
605 bool use_ssl = spdy_session->GetSSLInfo(
606 &ssl_info, &was_npn_negotiated, &protocol_negotiated);
607 if (socket_->is_secure() && !use_ssl)
610 // Create SpdyWebSocketStream.
611 spdy_protocol_version_ = spdy_session->GetProtocolVersion();
612 spdy_websocket_stream_.reset(new SpdyWebSocketStream(spdy_session, this));
614 int result = spdy_websocket_stream_->InitializeStream(
615 socket_->url(), MEDIUM, *socket_->net_log());
617 OnConnected(socket_.get(), kMaxPendingSendAllowed);
618 return ERR_PROTOCOL_SWITCHED;
620 if (result != ERR_IO_PENDING) {
621 spdy_websocket_stream_.reset();
625 return ERR_IO_PENDING;
628 void WebSocketJob::SetWaiting() {
632 bool WebSocketJob::IsWaiting() const {
636 void WebSocketJob::Wakeup() {
640 DCHECK(!callback_.is_null());
641 base::MessageLoopForIO::current()->PostTask(
643 base::Bind(&WebSocketJob::RetryPendingIO,
644 weak_ptr_factory_.GetWeakPtr()));
647 void WebSocketJob::RetryPendingIO() {
648 int result = TrySpdyStream();
650 // In the case of ERR_IO_PENDING, CompleteIO() will be called from
651 // OnCreatedSpdyStream().
652 if (result != ERR_IO_PENDING)
656 void WebSocketJob::CompleteIO(int result) {
657 // |callback_| may be null if OnClose() or DetachDelegate() was called.
658 if (!callback_.is_null()) {
659 CompletionCallback callback = callback_;
661 callback.Run(result);
662 Release(); // Balanced with OnStartOpenConnection().
666 bool WebSocketJob::SendDataInternal(const char* data, int length) {
667 if (spdy_websocket_stream_.get())
668 return ERR_IO_PENDING == spdy_websocket_stream_->SendData(data, length);
670 return socket_->SendData(data, length);
674 void WebSocketJob::CloseInternal() {
675 if (spdy_websocket_stream_.get())
676 spdy_websocket_stream_->Close();
681 void WebSocketJob::SendPending() {
682 if (current_send_buffer_.get())
685 // Current buffer has been sent. Try next if any.
686 if (send_buffer_queue_.empty()) {
687 // No more data to send.
688 if (state_ == CLOSING)
693 scoped_refptr<IOBufferWithSize> next_buffer = send_buffer_queue_.front();
694 send_buffer_queue_.pop_front();
695 current_send_buffer_ =
696 new DrainableIOBuffer(next_buffer.get(), next_buffer->size());
697 SendDataInternal(current_send_buffer_->data(),
698 current_send_buffer_->BytesRemaining());