Upstream version 7.36.149.0
[platform/framework/web/crosswalk.git] / src / net / websockets / websocket_job.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/websockets/websocket_job.h"
6
7 #include <algorithm>
8
9 #include "base/bind.h"
10 #include "base/lazy_instance.h"
11 #include "net/base/io_buffer.h"
12 #include "net/base/net_errors.h"
13 #include "net/base/net_log.h"
14 #include "net/cookies/cookie_store.h"
15 #include "net/http/http_network_session.h"
16 #include "net/http/http_transaction_factory.h"
17 #include "net/http/http_util.h"
18 #include "net/spdy/spdy_session.h"
19 #include "net/spdy/spdy_session_pool.h"
20 #include "net/url_request/url_request_context.h"
21 #include "net/websockets/websocket_handshake_handler.h"
22 #include "net/websockets/websocket_net_log_params.h"
23 #include "net/websockets/websocket_throttle.h"
24 #include "url/gurl.h"
25
26 static const int kMaxPendingSendAllowed = 32768;  // 32 kilobytes.
27
28 namespace {
29
30 // lower-case header names.
31 const char* const kCookieHeaders[] = {
32   "cookie", "cookie2"
33 };
34 const char* const kSetCookieHeaders[] = {
35   "set-cookie", "set-cookie2"
36 };
37
38 net::SocketStreamJob* WebSocketJobFactory(
39     const GURL& url, net::SocketStream::Delegate* delegate,
40     net::URLRequestContext* context, net::CookieStore* cookie_store) {
41   net::WebSocketJob* job = new net::WebSocketJob(delegate);
42   job->InitSocketStream(new net::SocketStream(url, job, context, cookie_store));
43   return job;
44 }
45
46 class WebSocketJobInitSingleton {
47  private:
48   friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>;
49   WebSocketJobInitSingleton() {
50     net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory);
51     net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory);
52   }
53 };
54
55 static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init =
56     LAZY_INSTANCE_INITIALIZER;
57
58 }  // anonymous namespace
59
60 namespace net {
61
62 bool WebSocketJob::websocket_over_spdy_enabled_ = false;
63
64 // static
65 void WebSocketJob::EnsureInit() {
66   g_websocket_job_init.Get();
67 }
68
69 // static
70 void WebSocketJob::set_websocket_over_spdy_enabled(bool enabled) {
71   websocket_over_spdy_enabled_ = enabled;
72 }
73
74 WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate)
75     : delegate_(delegate),
76       state_(INITIALIZED),
77       waiting_(false),
78       handshake_request_(new WebSocketHandshakeRequestHandler),
79       handshake_response_(new WebSocketHandshakeResponseHandler),
80       started_to_send_handshake_request_(false),
81       handshake_request_sent_(0),
82       response_cookies_save_index_(0),
83       spdy_protocol_version_(0),
84       save_next_cookie_running_(false),
85       callback_pending_(false),
86       weak_ptr_factory_(this),
87       weak_ptr_factory_for_send_pending_(this) {
88 }
89
90 WebSocketJob::~WebSocketJob() {
91   DCHECK_EQ(CLOSED, state_);
92   DCHECK(!delegate_);
93   DCHECK(!socket_.get());
94 }
95
96 void WebSocketJob::Connect() {
97   DCHECK(socket_.get());
98   DCHECK_EQ(state_, INITIALIZED);
99   state_ = CONNECTING;
100   socket_->Connect();
101 }
102
103 bool WebSocketJob::SendData(const char* data, int len) {
104   switch (state_) {
105     case INITIALIZED:
106       return false;
107
108     case CONNECTING:
109       return SendHandshakeRequest(data, len);
110
111     case OPEN:
112       {
113         scoped_refptr<IOBufferWithSize> buffer = new IOBufferWithSize(len);
114         memcpy(buffer->data(), data, len);
115         if (current_send_buffer_.get() || !send_buffer_queue_.empty()) {
116           send_buffer_queue_.push_back(buffer);
117           return true;
118         }
119         current_send_buffer_ = new DrainableIOBuffer(buffer.get(), len);
120         return SendDataInternal(current_send_buffer_->data(),
121                                 current_send_buffer_->BytesRemaining());
122       }
123
124     case CLOSING:
125     case CLOSED:
126       return false;
127   }
128   return false;
129 }
130
131 void WebSocketJob::Close() {
132   if (state_ == CLOSED)
133     return;
134
135   state_ = CLOSING;
136   if (current_send_buffer_.get()) {
137     // Will close in SendPending.
138     return;
139   }
140   state_ = CLOSED;
141   CloseInternal();
142 }
143
144 void WebSocketJob::RestartWithAuth(const AuthCredentials& credentials) {
145   state_ = CONNECTING;
146   socket_->RestartWithAuth(credentials);
147 }
148
149 void WebSocketJob::DetachDelegate() {
150   state_ = CLOSED;
151   WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
152
153   scoped_refptr<WebSocketJob> protect(this);
154   weak_ptr_factory_.InvalidateWeakPtrs();
155   weak_ptr_factory_for_send_pending_.InvalidateWeakPtrs();
156
157   delegate_ = NULL;
158   if (socket_.get())
159     socket_->DetachDelegate();
160   socket_ = NULL;
161   if (!callback_.is_null()) {
162     waiting_ = false;
163     callback_.Reset();
164     Release();  // Balanced with OnStartOpenConnection().
165   }
166 }
167
168 int WebSocketJob::OnStartOpenConnection(
169     SocketStream* socket, const CompletionCallback& callback) {
170   DCHECK(callback_.is_null());
171   state_ = CONNECTING;
172
173   addresses_ = socket->address_list();
174   if (!WebSocketThrottle::GetInstance()->PutInQueue(this)) {
175     return ERR_WS_THROTTLE_QUEUE_TOO_LARGE;
176   }
177
178   if (delegate_) {
179     int result = delegate_->OnStartOpenConnection(socket, callback);
180     DCHECK_EQ(OK, result);
181   }
182   if (waiting_) {
183     // PutInQueue() may set |waiting_| true for throttling. In this case,
184     // Wakeup() will be called later.
185     callback_ = callback;
186     AddRef();  // Balanced when callback_ is cleared.
187     return ERR_IO_PENDING;
188   }
189   return TrySpdyStream();
190 }
191
192 void WebSocketJob::OnConnected(
193     SocketStream* socket, int max_pending_send_allowed) {
194   if (state_ == CLOSED)
195     return;
196   DCHECK_EQ(CONNECTING, state_);
197   if (delegate_)
198     delegate_->OnConnected(socket, max_pending_send_allowed);
199 }
200
201 void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) {
202   DCHECK_NE(INITIALIZED, state_);
203   DCHECK_GT(amount_sent, 0);
204   if (state_ == CLOSED)
205     return;
206   if (state_ == CONNECTING) {
207     OnSentHandshakeRequest(socket, amount_sent);
208     return;
209   }
210   if (delegate_) {
211     DCHECK(state_ == OPEN || state_ == CLOSING);
212     if (!current_send_buffer_.get()) {
213       VLOG(1)
214           << "OnSentData current_send_buffer=NULL amount_sent=" << amount_sent;
215       return;
216     }
217     current_send_buffer_->DidConsume(amount_sent);
218     if (current_send_buffer_->BytesRemaining() > 0)
219       return;
220
221     // We need to report amount_sent of original buffer size, instead of
222     // amount sent to |socket|.
223     amount_sent = current_send_buffer_->size();
224     DCHECK_GT(amount_sent, 0);
225     current_send_buffer_ = NULL;
226     if (!weak_ptr_factory_for_send_pending_.HasWeakPtrs()) {
227       base::MessageLoopForIO::current()->PostTask(
228           FROM_HERE,
229           base::Bind(&WebSocketJob::SendPending,
230                      weak_ptr_factory_for_send_pending_.GetWeakPtr()));
231     }
232     delegate_->OnSentData(socket, amount_sent);
233   }
234 }
235
236 void WebSocketJob::OnReceivedData(
237     SocketStream* socket, const char* data, int len) {
238   DCHECK_NE(INITIALIZED, state_);
239   if (state_ == CLOSED)
240     return;
241   if (state_ == CONNECTING) {
242     OnReceivedHandshakeResponse(socket, data, len);
243     return;
244   }
245   DCHECK(state_ == OPEN || state_ == CLOSING);
246   if (delegate_ && len > 0)
247     delegate_->OnReceivedData(socket, data, len);
248 }
249
250 void WebSocketJob::OnClose(SocketStream* socket) {
251   state_ = CLOSED;
252   WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
253
254   scoped_refptr<WebSocketJob> protect(this);
255   weak_ptr_factory_.InvalidateWeakPtrs();
256
257   SocketStream::Delegate* delegate = delegate_;
258   delegate_ = NULL;
259   socket_ = NULL;
260   if (!callback_.is_null()) {
261     waiting_ = false;
262     callback_.Reset();
263     Release();  // Balanced with OnStartOpenConnection().
264   }
265   if (delegate)
266     delegate->OnClose(socket);
267 }
268
269 void WebSocketJob::OnAuthRequired(
270     SocketStream* socket, AuthChallengeInfo* auth_info) {
271   if (delegate_)
272     delegate_->OnAuthRequired(socket, auth_info);
273 }
274
275 void WebSocketJob::OnSSLCertificateError(
276     SocketStream* socket, const SSLInfo& ssl_info, bool fatal) {
277   if (delegate_)
278     delegate_->OnSSLCertificateError(socket, ssl_info, fatal);
279 }
280
281 void WebSocketJob::OnError(const SocketStream* socket, int error) {
282   if (delegate_ && error != ERR_PROTOCOL_SWITCHED)
283     delegate_->OnError(socket, error);
284 }
285
286 void WebSocketJob::OnCreatedSpdyStream(int result) {
287   DCHECK(spdy_websocket_stream_.get());
288   DCHECK(socket_.get());
289   DCHECK_NE(ERR_IO_PENDING, result);
290
291   if (state_ == CLOSED) {
292     result = ERR_ABORTED;
293   } else if (result == OK) {
294     state_ = CONNECTING;
295     result = ERR_PROTOCOL_SWITCHED;
296   } else {
297     spdy_websocket_stream_.reset();
298   }
299
300   CompleteIO(result);
301 }
302
303 void WebSocketJob::OnSentSpdyHeaders() {
304   DCHECK_NE(INITIALIZED, state_);
305   if (state_ != CONNECTING)
306     return;
307   size_t original_length = handshake_request_->original_length();
308   handshake_request_.reset();
309   if (delegate_)
310     delegate_->OnSentData(socket_.get(), original_length);
311 }
312
313 void WebSocketJob::OnSpdyResponseHeadersUpdated(
314     const SpdyHeaderBlock& response_headers) {
315   DCHECK_NE(INITIALIZED, state_);
316   if (state_ != CONNECTING)
317     return;
318   // TODO(toyoshim): Fallback to non-spdy connection?
319   handshake_response_->ParseResponseHeaderBlock(response_headers,
320                                                 challenge_,
321                                                 spdy_protocol_version_);
322
323   SaveCookiesAndNotifyHeadersComplete();
324 }
325
326 void WebSocketJob::OnSentSpdyData(size_t bytes_sent) {
327   DCHECK_NE(INITIALIZED, state_);
328   DCHECK_NE(CONNECTING, state_);
329   if (state_ == CLOSED)
330     return;
331   if (!spdy_websocket_stream_.get())
332     return;
333   OnSentData(socket_.get(), static_cast<int>(bytes_sent));
334 }
335
336 void WebSocketJob::OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) {
337   DCHECK_NE(INITIALIZED, state_);
338   DCHECK_NE(CONNECTING, state_);
339   if (state_ == CLOSED)
340     return;
341   if (!spdy_websocket_stream_.get())
342     return;
343   if (buffer) {
344     OnReceivedData(
345         socket_.get(), buffer->GetRemainingData(), buffer->GetRemainingSize());
346   } else {
347     OnReceivedData(socket_.get(), NULL, 0);
348   }
349 }
350
351 void WebSocketJob::OnCloseSpdyStream() {
352   spdy_websocket_stream_.reset();
353   OnClose(socket_.get());
354 }
355
356 bool WebSocketJob::SendHandshakeRequest(const char* data, int len) {
357   DCHECK_EQ(state_, CONNECTING);
358   if (started_to_send_handshake_request_)
359     return false;
360   if (!handshake_request_->ParseRequest(data, len))
361     return false;
362
363   AddCookieHeaderAndSend();
364   return true;
365 }
366
367 void WebSocketJob::AddCookieHeaderAndSend() {
368   bool allow = true;
369   if (delegate_ && !delegate_->CanGetCookies(socket_.get(), GetURLForCookies()))
370     allow = false;
371
372   if (socket_.get() && delegate_ && state_ == CONNECTING) {
373     handshake_request_->RemoveHeaders(kCookieHeaders,
374                                       arraysize(kCookieHeaders));
375     if (allow && socket_->cookie_store()) {
376       // Add cookies, including HttpOnly cookies.
377       CookieOptions cookie_options;
378       cookie_options.set_include_httponly();
379       socket_->cookie_store()->GetCookiesWithOptionsAsync(
380           GetURLForCookies(), cookie_options,
381           base::Bind(&WebSocketJob::LoadCookieCallback,
382                      weak_ptr_factory_.GetWeakPtr()));
383     } else {
384       DoSendData();
385     }
386   }
387 }
388
389 void WebSocketJob::LoadCookieCallback(const std::string& cookie) {
390   if (!cookie.empty())
391     // TODO(tyoshino): Sending cookie means that connection doesn't need
392     // PRIVACY_MODE_ENABLED as cookies may be server-bound and channel id
393     // wouldn't negatively affect privacy anyway. Need to restart connection
394     // or refactor to determine cookie status prior to connecting.
395     handshake_request_->AppendHeaderIfMissing("Cookie", cookie);
396   DoSendData();
397 }
398
399 void WebSocketJob::DoSendData() {
400   if (spdy_websocket_stream_.get()) {
401     scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
402     handshake_request_->GetRequestHeaderBlock(
403         socket_->url(), headers.get(), &challenge_, spdy_protocol_version_);
404     spdy_websocket_stream_->SendRequest(headers.Pass());
405   } else {
406     const std::string& handshake_request =
407         handshake_request_->GetRawRequest();
408     handshake_request_sent_ = 0;
409     socket_->net_log()->AddEvent(
410         NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS,
411         base::Bind(&NetLogWebSocketHandshakeCallback, &handshake_request));
412     socket_->SendData(handshake_request.data(),
413                       handshake_request.size());
414   }
415   // Just buffered in |handshake_request_|.
416   started_to_send_handshake_request_ = true;
417 }
418
419 void WebSocketJob::OnSentHandshakeRequest(
420     SocketStream* socket, int amount_sent) {
421   DCHECK_EQ(state_, CONNECTING);
422   handshake_request_sent_ += amount_sent;
423   DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length());
424   if (handshake_request_sent_ >= handshake_request_->raw_length()) {
425     // handshake request has been sent.
426     // notify original size of handshake request to delegate.
427     // Reset the handshake_request_ first in case this object is deleted by the
428     // delegate.
429     size_t original_length = handshake_request_->original_length();
430     handshake_request_.reset();
431     if (delegate_)
432       delegate_->OnSentData(socket, original_length);
433   }
434 }
435
436 void WebSocketJob::OnReceivedHandshakeResponse(
437     SocketStream* socket, const char* data, int len) {
438   DCHECK_EQ(state_, CONNECTING);
439   if (handshake_response_->HasResponse()) {
440     // If we already has handshake response, received data should be frame
441     // data, not handshake message.
442     received_data_after_handshake_.insert(
443         received_data_after_handshake_.end(), data, data + len);
444     return;
445   }
446
447   size_t response_length = handshake_response_->ParseRawResponse(data, len);
448   if (!handshake_response_->HasResponse()) {
449     // not yet. we need more data.
450     return;
451   }
452   // handshake message is completed.
453   std::string raw_response = handshake_response_->GetRawResponse();
454   socket_->net_log()->AddEvent(
455       NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS,
456       base::Bind(&NetLogWebSocketHandshakeCallback, &raw_response));
457   if (len - response_length > 0) {
458     // If we received extra data, it should be frame data.
459     DCHECK(received_data_after_handshake_.empty());
460     received_data_after_handshake_.assign(data + response_length, data + len);
461   }
462   SaveCookiesAndNotifyHeadersComplete();
463 }
464
465 void WebSocketJob::SaveCookiesAndNotifyHeadersComplete() {
466   // handshake message is completed.
467   DCHECK(handshake_response_->HasResponse());
468
469   // Extract cookies from the handshake response into a temporary vector.
470   response_cookies_.clear();
471   response_cookies_save_index_ = 0;
472
473   handshake_response_->GetHeaders(
474       kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_);
475
476   // Now, loop over the response cookies, and attempt to persist each.
477   SaveNextCookie();
478 }
479
480 void WebSocketJob::NotifyHeadersComplete() {
481   // Remove cookie headers, with malformed headers preserved.
482   // Actual handshake should be done in Blink.
483   handshake_response_->RemoveHeaders(
484       kSetCookieHeaders, arraysize(kSetCookieHeaders));
485   std::string handshake_response = handshake_response_->GetResponse();
486   handshake_response_.reset();
487   std::vector<char> received_data(handshake_response.begin(),
488                                   handshake_response.end());
489   received_data.insert(received_data.end(),
490                        received_data_after_handshake_.begin(),
491                        received_data_after_handshake_.end());
492   received_data_after_handshake_.clear();
493
494   state_ = OPEN;
495
496   DCHECK(!received_data.empty());
497   if (delegate_)
498     delegate_->OnReceivedData(
499         socket_.get(), &received_data.front(), received_data.size());
500
501   WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
502 }
503
504 void WebSocketJob::SaveNextCookie() {
505   if (!socket_.get() || !delegate_ || state_ != CONNECTING)
506     return;
507
508   callback_pending_ = false;
509   save_next_cookie_running_ = true;
510
511   if (socket_->cookie_store()) {
512     GURL url_for_cookies = GetURLForCookies();
513
514     CookieOptions options;
515     options.set_include_httponly();
516
517     // Loop as long as SetCookieWithOptionsAsync completes synchronously. Since
518     // CookieMonster's asynchronous operation APIs queue the callback to run it
519     // on the thread where the API was called, there won't be race. I.e. unless
520     // the callback is run synchronously, it won't be run in parallel with this
521     // method.
522     while (!callback_pending_ &&
523            response_cookies_save_index_ < response_cookies_.size()) {
524       std::string cookie = response_cookies_[response_cookies_save_index_];
525       response_cookies_save_index_++;
526
527       if (!delegate_->CanSetCookie(
528               socket_.get(), url_for_cookies, cookie, &options))
529         continue;
530
531       callback_pending_ = true;
532       socket_->cookie_store()->SetCookieWithOptionsAsync(
533           url_for_cookies, cookie, options,
534           base::Bind(&WebSocketJob::OnCookieSaved,
535                      weak_ptr_factory_.GetWeakPtr()));
536     }
537   }
538
539   save_next_cookie_running_ = false;
540
541   if (callback_pending_)
542     return;
543
544   response_cookies_.clear();
545   response_cookies_save_index_ = 0;
546
547   NotifyHeadersComplete();
548 }
549
550 void WebSocketJob::OnCookieSaved(bool cookie_status) {
551   // Tell the caller of SetCookieWithOptionsAsync() that this completion
552   // callback is invoked.
553   // - If the caller checks callback_pending earlier than this callback, the
554   //   caller exits to let this method continue iteration.
555   // - Otherwise, the caller continues iteration.
556   callback_pending_ = false;
557
558   // Resume SaveNextCookie if the caller of SetCookieWithOptionsAsync() exited
559   // the loop. Otherwise, return.
560   if (save_next_cookie_running_)
561     return;
562
563   SaveNextCookie();
564 }
565
566 GURL WebSocketJob::GetURLForCookies() const {
567   GURL url = socket_->url();
568   std::string scheme = socket_->is_secure() ? "https" : "http";
569   url::Replacements<char> replacements;
570   replacements.SetScheme(scheme.c_str(), url::Component(0, scheme.length()));
571   return url.ReplaceComponents(replacements);
572 }
573
574 const AddressList& WebSocketJob::address_list() const {
575   return addresses_;
576 }
577
578 int WebSocketJob::TrySpdyStream() {
579   if (!socket_.get())
580     return ERR_FAILED;
581
582   if (!websocket_over_spdy_enabled_)
583     return OK;
584
585   // Check if we have a SPDY session available.
586   HttpTransactionFactory* factory =
587       socket_->context()->http_transaction_factory();
588   if (!factory)
589     return OK;
590   scoped_refptr<HttpNetworkSession> session = factory->GetSession();
591   if (!session.get())
592     return OK;
593   SpdySessionPool* spdy_pool = session->spdy_session_pool();
594   PrivacyMode privacy_mode = socket_->privacy_mode();
595   const SpdySessionKey key(HostPortPair::FromURL(socket_->url()),
596                            socket_->proxy_server(), privacy_mode);
597   // Forbid wss downgrade to SPDY without SSL.
598   // TODO(toyoshim): Does it realize the same policy with HTTP?
599   base::WeakPtr<SpdySession> spdy_session =
600       spdy_pool->FindAvailableSession(key, *socket_->net_log());
601   if (!spdy_session)
602     return OK;
603
604   SSLInfo ssl_info;
605   bool was_npn_negotiated;
606   NextProto protocol_negotiated = kProtoUnknown;
607   bool use_ssl = spdy_session->GetSSLInfo(
608       &ssl_info, &was_npn_negotiated, &protocol_negotiated);
609   if (socket_->is_secure() && !use_ssl)
610     return OK;
611
612   // Create SpdyWebSocketStream.
613   spdy_protocol_version_ = spdy_session->GetProtocolVersion();
614   spdy_websocket_stream_.reset(new SpdyWebSocketStream(spdy_session, this));
615
616   int result = spdy_websocket_stream_->InitializeStream(
617       socket_->url(), MEDIUM, *socket_->net_log());
618   if (result == OK) {
619     OnConnected(socket_.get(), kMaxPendingSendAllowed);
620     return ERR_PROTOCOL_SWITCHED;
621   }
622   if (result != ERR_IO_PENDING) {
623     spdy_websocket_stream_.reset();
624     return OK;
625   }
626
627   return ERR_IO_PENDING;
628 }
629
630 void WebSocketJob::SetWaiting() {
631   waiting_ = true;
632 }
633
634 bool WebSocketJob::IsWaiting() const {
635   return waiting_;
636 }
637
638 void WebSocketJob::Wakeup() {
639   if (!waiting_)
640     return;
641   waiting_ = false;
642   DCHECK(!callback_.is_null());
643   base::MessageLoopForIO::current()->PostTask(
644       FROM_HERE,
645       base::Bind(&WebSocketJob::RetryPendingIO,
646                  weak_ptr_factory_.GetWeakPtr()));
647 }
648
649 void WebSocketJob::RetryPendingIO() {
650   int result = TrySpdyStream();
651
652   // In the case of ERR_IO_PENDING, CompleteIO() will be called from
653   // OnCreatedSpdyStream().
654   if (result != ERR_IO_PENDING)
655     CompleteIO(result);
656 }
657
658 void WebSocketJob::CompleteIO(int result) {
659   // |callback_| may be null if OnClose() or DetachDelegate() was called.
660   if (!callback_.is_null()) {
661     CompletionCallback callback = callback_;
662     callback_.Reset();
663     callback.Run(result);
664     Release();  // Balanced with OnStartOpenConnection().
665   }
666 }
667
668 bool WebSocketJob::SendDataInternal(const char* data, int length) {
669   if (spdy_websocket_stream_.get())
670     return ERR_IO_PENDING == spdy_websocket_stream_->SendData(data, length);
671   if (socket_.get())
672     return socket_->SendData(data, length);
673   return false;
674 }
675
676 void WebSocketJob::CloseInternal() {
677   if (spdy_websocket_stream_.get())
678     spdy_websocket_stream_->Close();
679   if (socket_.get())
680     socket_->Close();
681 }
682
683 void WebSocketJob::SendPending() {
684   if (current_send_buffer_.get())
685     return;
686
687   // Current buffer has been sent. Try next if any.
688   if (send_buffer_queue_.empty()) {
689     // No more data to send.
690     if (state_ == CLOSING)
691       CloseInternal();
692     return;
693   }
694
695   scoped_refptr<IOBufferWithSize> next_buffer = send_buffer_queue_.front();
696   send_buffer_queue_.pop_front();
697   current_send_buffer_ =
698       new DrainableIOBuffer(next_buffer.get(), next_buffer->size());
699   SendDataInternal(current_send_buffer_->data(),
700                    current_send_buffer_->BytesRemaining());
701 }
702
703 }  // namespace net